This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8a880f51 Support file base node discovery (#928)
8a880f51 is described below
commit 8a880f51df7a9f843803b84ca8b6e9ff9d2acfca
Author: mrproliu <[email protected]>
AuthorDate: Mon Jan 12 17:01:55 2026 +0800
Support file base node discovery (#928)
* Support file base node discovery
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/metadata/client.go | 135 ++++-
banyand/metadata/discovery/common/base.go | 45 ++
banyand/metadata/discovery/common/cache.go | 216 ++++++++
banyand/metadata/discovery/common/grpc.go | 61 ++
banyand/metadata/discovery/common/retry.go | 284 ++++++++++
banyand/metadata/{ => discovery}/dns/dns.go | 325 +++++------
banyand/metadata/{ => discovery}/dns/dns_test.go | 7 +-
.../metadata/{ => discovery}/dns/export_test.go | 12 +-
banyand/metadata/{ => discovery}/dns/metrics.go | 32 ++
.../{ => discovery}/dns/testdata/ca_cert.pem | 0
.../{ => discovery}/dns/testdata/server_key.pem | 0
banyand/metadata/discovery/file/file.go | 466 ++++++++++++++++
banyand/metadata/discovery/file/file_test.go | 616 +++++++++++++++++++++
banyand/metadata/discovery/file/metrics.go | 70 +++
docs/operation/node-discovery.md | 128 ++++-
15 files changed, 2190 insertions(+), 207 deletions(-)
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 0ee2b102..ae5a9d30 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -33,7 +33,8 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata/dns"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/file"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -52,6 +53,8 @@ const (
NodeDiscoveryModeEtcd = "etcd"
// NodeDiscoveryModeDNS represents DNS-based node discovery mode.
NodeDiscoveryModeDNS = "dns"
+ // NodeDiscoveryModeFile represents file-based node discovery mode.
+ NodeDiscoveryModeFile = "file"
)
const flagEtcdUsername = "etcd-username"
@@ -74,30 +77,36 @@ func NewClient(toRegisterNode, forceRegisterNode bool)
(Service, error) {
}
type clientService struct {
- schemaRegistry schema.Registry
- dnsDiscovery *dns.Service
- closer *run.Closer
- nodeInfo *databasev1.Node
- etcdTLSCertFile string
- dnsCACertPaths []string
- etcdPassword string
- etcdTLSCAFile string
- etcdUsername string
- etcdTLSKeyFile string
- namespace string
- nodeDiscoveryMode string
- dnsSRVAddresses []string
- endpoints []string
- registryTimeout time.Duration
- dnsFetchInitInterval time.Duration
- dnsFetchInitDuration time.Duration
- dnsFetchInterval time.Duration
- grpcTimeout time.Duration
- etcdFullSyncInterval time.Duration
- nodeInfoMux sync.Mutex
- forceRegisterNode bool
- toRegisterNode bool
- dnsTLSEnabled bool
+ schemaRegistry schema.Registry
+ dnsDiscovery *dns.Service
+ fileDiscovery *file.Service
+ closer *run.Closer
+ nodeInfo *databasev1.Node
+ etcdTLSCertFile string
+ dnsCACertPaths []string
+ etcdPassword string
+ etcdTLSCAFile string
+ etcdUsername string
+ etcdTLSKeyFile string
+ namespace string
+ nodeDiscoveryMode string
+ filePath string
+ dnsSRVAddresses []string
+ endpoints []string
+ registryTimeout time.Duration
+ dnsFetchInitInterval time.Duration
+ dnsFetchInitDuration time.Duration
+ dnsFetchInterval time.Duration
+ grpcTimeout time.Duration
+ etcdFullSyncInterval time.Duration
+ fileFetchInterval time.Duration
+ fileRetryInitialInterval time.Duration
+ fileRetryMaxInterval time.Duration
+ fileRetryMultiplier float64
+ nodeInfoMux sync.Mutex
+ forceRegisterNode bool
+ toRegisterNode bool
+ dnsTLSEnabled bool
}
func (s *clientService) SchemaRegistry() schema.Registry {
@@ -118,7 +127,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
// node discovery configuration
fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode",
NodeDiscoveryModeEtcd,
- "Node discovery mode: 'etcd' for etcd-based discovery, 'dns'
for DNS-based discovery")
+ "Node discovery mode: 'etcd' for etcd-based discovery, 'dns'
for DNS-based discovery, 'file' for file-based discovery")
fs.StringSliceVar(&s.dnsSRVAddresses,
"node-discovery-dns-srv-addresses", []string{},
"DNS SRV addresses for node discovery (e.g.,
_grpc._tcp.banyandb.svc.cluster.local)")
fs.DurationVar(&s.dnsFetchInitInterval,
"node-discovery-dns-fetch-init-interval", 5*time.Second,
@@ -133,13 +142,25 @@ func (s *clientService) FlagSet() *run.FlagSet {
"Enable TLS for DNS discovery gRPC connections")
fs.StringSliceVar(&s.dnsCACertPaths, "node-discovery-dns-ca-certs",
[]string{},
"Comma-separated list of CA certificate files to verify DNS
discovered nodes (one per SRV address, in same order)")
+ fs.StringVar(&s.filePath, "node-discovery-file-path", "",
+ "File path for static node configuration (file mode only)")
+ fs.DurationVar(&s.fileFetchInterval,
"node-discovery-file-fetch-interval", 5*time.Minute,
+ "Interval to poll the discovery file in file discovery mode
(fallback mechanism)")
+ fs.DurationVar(&s.fileRetryInitialInterval,
"node-discovery-file-retry-initial-interval", 1*time.Second,
+ "Initial retry interval for failed node metadata fetches in
file discovery mode")
+ fs.DurationVar(&s.fileRetryMaxInterval,
"node-discovery-file-retry-max-interval", 2*time.Minute,
+ "Maximum retry interval for failed node metadata fetches in
file discovery mode")
+ fs.Float64Var(&s.fileRetryMultiplier,
"node-discovery-file-retry-multiplier", 2.0,
+ "Backoff multiplier for retry intervals in file discovery mode")
return fs
}
func (s *clientService) Validate() error {
- if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode
!= NodeDiscoveryModeDNS {
- return fmt.Errorf("invalid node-discovery-mode: %s, must be
'%s' or '%s'", s.nodeDiscoveryMode, NodeDiscoveryModeEtcd, NodeDiscoveryModeDNS)
+ if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode
!= NodeDiscoveryModeDNS &&
+ s.nodeDiscoveryMode != NodeDiscoveryModeFile {
+ return fmt.Errorf("invalid node-discovery-mode: %s, must be
'%s', '%s', or '%s'",
+ s.nodeDiscoveryMode, NodeDiscoveryModeEtcd,
NodeDiscoveryModeDNS, NodeDiscoveryModeFile)
}
// Validate etcd endpoints (always required for schema storage,
regardless of node discovery mode)
@@ -163,6 +184,16 @@ func (s *clientService) Validate() error {
}
}
+ // Validate file mode specific requirements
+ if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
+ if s.filePath == "" {
+ return errors.New("file mode requires non-empty file
path")
+ }
+ if _, err := os.Stat(s.filePath); err != nil {
+ return fmt.Errorf("file path validation failed: %w",
err)
+ }
+ }
+
return nil
}
@@ -230,8 +261,26 @@ func (s *clientService) PreRun(ctx context.Context) error {
}
}
- // skip node registration if DNS mode is enabled or node registration
is disabled
- if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS {
+ if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
+ l.Info().Str("file-path", s.filePath).Msg("Initializing
file-based node discovery")
+
+ var createErr error
+ s.fileDiscovery, createErr = file.NewService(file.Config{
+ FilePath: s.filePath,
+ GRPCTimeout: s.grpcTimeout,
+ FetchInterval: s.fileFetchInterval,
+ RetryInitialInterval: s.fileRetryInitialInterval,
+ RetryMaxInterval: s.fileRetryMaxInterval,
+ RetryMultiplier: s.fileRetryMultiplier,
+ })
+ if createErr != nil {
+ return fmt.Errorf("failed to create file discovery
service: %w", createErr)
+ }
+ }
+
+ // skip node registration if DNS/file mode is enabled or node
registration is disabled
+ if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS ||
+ s.nodeDiscoveryMode == NodeDiscoveryModeFile {
return nil
}
val := ctx.Value(common.ContextNodeKey)
@@ -299,6 +348,13 @@ func (s *clientService) Serve() run.StopNotify {
}
}
+ // Start file discovery
+ if s.fileDiscovery != nil {
+ if err := s.fileDiscovery.Start(s.closer.Ctx()); err != nil {
+ logger.GetLogger(s.Name()).Error().Err(err).Msg("failed
to start file discovery")
+ }
+ }
+
return s.closer.CloseNotify()
}
@@ -312,6 +368,12 @@ func (s *clientService) GracefulStop() {
}
}
+ if s.fileDiscovery != nil {
+ if err := s.fileDiscovery.Close(); err != nil {
+ logger.GetLogger(s.Name()).Error().Err(err).Msg("failed
to close file discovery")
+ }
+ }
+
if s.schemaRegistry != nil {
if err := s.schemaRegistry.Close(); err != nil {
logger.GetLogger(s.Name()).Error().Err(err).Msg("failed
to close schema registry")
@@ -320,6 +382,10 @@ func (s *clientService) GracefulStop() {
}
func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler
schema.EventHandler) {
+ if kind == schema.KindNode && s.fileDiscovery != nil {
+ s.fileDiscovery.RegisterHandler(name, handler)
+ return
+ }
if kind == schema.KindNode && s.dnsDiscovery != nil {
s.dnsDiscovery.RegisterHandler(name, handler)
return
@@ -356,6 +422,10 @@ func (s *clientService) TopNAggregationRegistry()
schema.TopNAggregation {
}
func (s *clientService) NodeRegistry() schema.Node {
+ // If file discovery is enabled, use it instead of etcd
+ if s.fileDiscovery != nil {
+ return s.fileDiscovery
+ }
// If DNS discovery is enabled, use it instead of etcd
if s.dnsDiscovery != nil {
return s.dnsDiscovery
@@ -374,6 +444,11 @@ func (s *clientService) SetMetricsRegistry(omr
observability.MetricsRegistry) {
factory :=
observability.RootScope.SubScope("metadata").SubScope("dns_discovery")
s.dnsDiscovery.SetMetrics(omr.With(factory))
}
+ // initialize file discovery with metrics if it exists
+ if s.fileDiscovery != nil {
+ factory :=
observability.RootScope.SubScope("metadata").SubScope("file_discovery")
+ s.fileDiscovery.SetMetrics(omr.With(factory))
+ }
}
func (s *clientService) Name() string {
diff --git a/banyand/metadata/discovery/common/base.go
b/banyand/metadata/discovery/common/base.go
new file mode 100644
index 00000000..9ddfe95f
--- /dev/null
+++ b/banyand/metadata/discovery/common/base.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+// DiscoveryServiceBase combines NodeCacheBase and RetryManager for discovery
services.
+type DiscoveryServiceBase struct {
+ *NodeCacheBase
+ *RetryManager
+}
+
+// NewServiceBase create a new base service for gRPC-based and file-based
service.
+func NewServiceBase(
+ loggerName string,
+ fetcher NodeFetcher,
+ retryConfig RetryConfig,
+) *DiscoveryServiceBase {
+ cacheBase := NewNodeCacheBase(loggerName)
+ retryManager := NewRetryManager(fetcher, cacheBase, retryConfig, nil)
+ return &DiscoveryServiceBase{
+ NodeCacheBase: cacheBase,
+ RetryManager: retryManager,
+ }
+}
+
+// SetMetrics sets metrics for the retry manager.
+func (b *DiscoveryServiceBase) SetMetrics(metrics RetryMetrics) {
+ if b.RetryManager != nil {
+ b.RetryManager.SetMetrics(metrics)
+ }
+}
diff --git a/banyand/metadata/discovery/common/cache.go
b/banyand/metadata/discovery/common/cache.go
new file mode 100644
index 00000000..2dfc268c
--- /dev/null
+++ b/banyand/metadata/discovery/common/cache.go
@@ -0,0 +1,216 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package common provides shared functionality for node discovery
implementations.
+package common
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// NodeCacheBase provides common node cache management functionality for
discovery services.
+type NodeCacheBase struct {
+ nodeCache map[string]*databasev1.Node
+ log *logger.Logger
+ handlers []schema.EventHandler
+ cacheMutex sync.RWMutex
+ handlersMutex sync.RWMutex
+}
+
+// NewNodeCacheBase creates a new base with initialized maps.
+func NewNodeCacheBase(loggerName string) *NodeCacheBase {
+ return &NodeCacheBase{
+ nodeCache: make(map[string]*databasev1.Node),
+ handlers: make([]schema.EventHandler, 0),
+ log: logger.GetLogger(loggerName),
+ }
+}
+
+// GetLogger returns the logger.
+func (b *NodeCacheBase) GetLogger() *logger.Logger {
+ return b.log
+}
+
+// ListNode lists all nodes from cache, filtering by role.
+func (b *NodeCacheBase) ListNode(_ context.Context, role databasev1.Role)
([]*databasev1.Node, error) {
+ b.cacheMutex.RLock()
+ defer b.cacheMutex.RUnlock()
+
+ var result []*databasev1.Node
+ for _, node := range b.nodeCache {
+ if role != databasev1.Role_ROLE_UNSPECIFIED {
+ hasRole := false
+ for _, nodeRole := range node.GetRoles() {
+ if nodeRole == role {
+ hasRole = true
+ break
+ }
+ }
+ if !hasRole {
+ continue
+ }
+ }
+ result = append(result, node)
+ }
+ return result, nil
+}
+
+// GetNode retrieves a specific node by name.
+func (b *NodeCacheBase) GetNode(_ context.Context, nodeName string)
(*databasev1.Node, error) {
+ b.cacheMutex.RLock()
+ defer b.cacheMutex.RUnlock()
+
+ for _, node := range b.nodeCache {
+ if node.GetMetadata() != nil && node.GetMetadata().GetName() ==
nodeName {
+ return node, nil
+ }
+ }
+ return nil, fmt.Errorf("node %s not found", nodeName)
+}
+
+// RegisterHandler registers an event handler for node changes.
+func (b *NodeCacheBase) RegisterHandler(name string, handler
schema.EventHandler) {
+ b.handlersMutex.Lock()
+ defer b.handlersMutex.Unlock()
+
+ b.handlers = append(b.handlers, handler)
+ b.log.Debug().Str("handler", name).Msg("Registered node discovery
handler")
+}
+
+// NotifyHandlers notifies all registered handlers of node changes.
+func (b *NodeCacheBase) NotifyHandlers(metadata schema.Metadata, isAddOrUpdate
bool) {
+ b.handlersMutex.RLock()
+ defer b.handlersMutex.RUnlock()
+
+ for _, handler := range b.handlers {
+ if isAddOrUpdate {
+ handler.OnAddOrUpdate(metadata)
+ } else {
+ handler.OnDelete(metadata)
+ }
+ }
+}
+
+// AddNode adds a node to the cache.
+// Returns true if the node was added, false if it already existed.
+func (b *NodeCacheBase) AddNode(address string, node *databasev1.Node) bool {
+ b.cacheMutex.Lock()
+ defer b.cacheMutex.Unlock()
+
+ if _, exists := b.nodeCache[address]; exists {
+ return false
+ }
+ b.nodeCache[address] = node
+ return true
+}
+
+// GetCachedNode retrieves a node from cache by address.
+// Returns the node and true if found, nil and false otherwise.
+func (b *NodeCacheBase) GetCachedNode(address string) (*databasev1.Node, bool)
{
+ b.cacheMutex.RLock()
+ defer b.cacheMutex.RUnlock()
+ node, exists := b.nodeCache[address]
+ return node, exists
+}
+
+// GetCacheSize returns the current number of nodes in the cache.
+func (b *NodeCacheBase) GetCacheSize() int {
+ b.cacheMutex.RLock()
+ defer b.cacheMutex.RUnlock()
+ return len(b.nodeCache)
+}
+
+// GetAllNodeAddresses returns all addresses currently in the cache.
+func (b *NodeCacheBase) GetAllNodeAddresses() []string {
+ b.cacheMutex.RLock()
+ defer b.cacheMutex.RUnlock()
+
+ addresses := make([]string, 0, len(b.nodeCache))
+ for addr := range b.nodeCache {
+ addresses = append(addresses, addr)
+ }
+ return addresses
+}
+
+// RemoveNodes removes multiple nodes from the cache.
+// Returns a map of removed nodes keyed by address.
+func (b *NodeCacheBase) RemoveNodes(addresses []string)
map[string]*databasev1.Node {
+ b.cacheMutex.Lock()
+ defer b.cacheMutex.Unlock()
+
+ removed := make(map[string]*databasev1.Node)
+ for _, addr := range addresses {
+ if node, exists := b.nodeCache[addr]; exists {
+ delete(b.nodeCache, addr)
+ removed[addr] = node
+ }
+ }
+ return removed
+}
+
+// AddNodeAndNotify adds a node to the cache and notifies handlers if added
successfully.
+// Returns true if the node was added, false if it already existed.
+func (b *NodeCacheBase) AddNodeAndNotify(address string, node
*databasev1.Node, logMsg string) bool {
+ if added := b.AddNode(address, node); added {
+ // notify handlers
+ b.NotifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ b.log.Debug().
+ Str("address", address).
+ Str("name", node.GetMetadata().GetName()).
+ Msg(logMsg)
+
+ return true
+ }
+ return false
+}
+
+// RemoveNodesAndNotify removes nodes and notifies handlers for each removed
node.
+// Returns a map of removed nodes keyed by address.
+func (b *NodeCacheBase) RemoveNodesAndNotify(addresses []string, logMsg
string) map[string]*databasev1.Node {
+ removed := b.RemoveNodes(addresses)
+
+ // notify handlers for deletions
+ for addr, node := range removed {
+ b.NotifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, false)
+
+ b.log.Debug().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Msg(logMsg)
+ }
+
+ return removed
+}
diff --git a/banyand/metadata/discovery/common/grpc.go
b/banyand/metadata/discovery/common/grpc.go
new file mode 100644
index 00000000..198950ce
--- /dev/null
+++ b/banyand/metadata/discovery/common/grpc.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+)
+
+// GRPCDialOptionsProvider provides gRPC dial options for TLS configuration.
+type GRPCDialOptionsProvider interface {
+ GetDialOptions(address string) ([]grpc.DialOption, error)
+}
+
+// FetchNodeMetadata fetches node metadata via gRPC.
+// This is the common implementation used by both DNS and file discovery.
+func FetchNodeMetadata(ctx context.Context, address string, timeout
time.Duration, dialOptsProvider GRPCDialOptionsProvider) (*databasev1.Node,
error) {
+ ctxTimeout, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+
+ dialOpts, err := dialOptsProvider.GetDialOptions(address)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get dial options for %s: %w",
address, err)
+ }
+
+ // nolint:contextcheck
+ conn, connErr := grpchelper.Conn(address, timeout, dialOpts...)
+ if connErr != nil {
+ return nil, fmt.Errorf("failed to connect to %s: %w", address,
connErr)
+ }
+ defer conn.Close()
+
+ client := databasev1.NewNodeQueryServiceClient(conn)
+ resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
+ if callErr != nil {
+ return nil, fmt.Errorf("failed to get current node from %s:
%w", address, callErr)
+ }
+
+ return resp.GetNode(), nil
+}
diff --git a/banyand/metadata/discovery/common/retry.go
b/banyand/metadata/discovery/common/retry.go
new file mode 100644
index 00000000..b419c245
--- /dev/null
+++ b/banyand/metadata/discovery/common/retry.go
@@ -0,0 +1,284 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// NodeFetcher defines the interface for fetching node metadata.
+type NodeFetcher interface {
+ // FetchNodeWithRetry attempts to fetch node metadata for the given
address.
+ FetchNodeWithRetry(ctx context.Context, address string)
(*databasev1.Node, error)
+}
+
+// RetryMetrics defines metrics interface for retry operations.
+type RetryMetrics interface {
+ IncRetryCount()
+ IncRetrySuccess()
+ IncRetryFailed()
+ SetQueueSize(size float64)
+}
+
+// RetryConfig holds retry backoff configuration.
+type RetryConfig struct {
+ InitialInterval time.Duration
+ MaxInterval time.Duration
+ Multiplier float64
+}
+
+// RetryState tracks backoff retry state for a single node.
+type RetryState struct {
+ nextRetryTime time.Time
+ lastError error
+ address string
+ attemptCount int
+ currentBackoff time.Duration
+}
+
+// RetryManager manages retry queue and scheduling for failed node fetches.
+type RetryManager struct {
+ fetcher NodeFetcher
+ metrics RetryMetrics
+ cacheBase *NodeCacheBase
+ retryQueue map[string]*RetryState
+ log *logger.Logger
+ onSuccess func(address string, node *databasev1.Node)
+ onAddedToNode func(address string)
+ config RetryConfig
+ retryMutex sync.RWMutex
+}
+
+// NewRetryManager creates a new retry manager.
+func NewRetryManager(
+ fetcher NodeFetcher,
+ cacheBase *NodeCacheBase,
+ config RetryConfig,
+ metrics RetryMetrics,
+) *RetryManager {
+ return &RetryManager{
+ fetcher: fetcher,
+ cacheBase: cacheBase,
+ retryQueue: make(map[string]*RetryState),
+ config: config,
+ metrics: metrics,
+ log: cacheBase.GetLogger(),
+ }
+}
+
+// SetSuccessCallback sets the callback to be called when a node is
successfully fetched after retry.
+func (r *RetryManager) SetSuccessCallback(callback func(address string, node
*databasev1.Node)) {
+ r.onSuccess = callback
+}
+
+// SetAddedToNodeCallback sets the callback to be called when a node is added
to cache.
+func (r *RetryManager) SetAddedToNodeCallback(callback func(address string)) {
+ r.onAddedToNode = callback
+}
+
+// SetMetrics sets the metrics for the retry manager.
+func (r *RetryManager) SetMetrics(metrics RetryMetrics) {
+ r.metrics = metrics
+}
+
+// AddToRetry adds a failed node fetch to the retry queue.
+func (r *RetryManager) AddToRetry(address string, fetchErr error) {
+ r.retryMutex.Lock()
+ defer r.retryMutex.Unlock()
+
+ r.retryQueue[address] = &RetryState{
+ address: address,
+ attemptCount: 1,
+ nextRetryTime: time.Now().Add(r.config.InitialInterval),
+ currentBackoff: r.config.InitialInterval,
+ lastError: fetchErr,
+ }
+
+ if r.metrics != nil {
+ r.metrics.IncRetryCount()
+ r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+ }
+
+ r.log.Warn().
+ Err(fetchErr).
+ Str("address", address).
+ Msg("Failed to fetch node metadata, adding to retry queue")
+}
+
+// RemoveFromRetry removes an address from the retry queue.
+func (r *RetryManager) RemoveFromRetry(address string) {
+ r.retryMutex.Lock()
+ defer r.retryMutex.Unlock()
+
+ delete(r.retryQueue, address)
+ if r.metrics != nil {
+ r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+ }
+
+ r.log.Debug().
+ Str("address", address).
+ Msg("Removed node from retry queue")
+}
+
+// IsInRetry checks if an address is in the retry queue.
+func (r *RetryManager) IsInRetry(address string) bool {
+ r.retryMutex.RLock()
+ defer r.retryMutex.RUnlock()
+ _, exists := r.retryQueue[address]
+ return exists
+}
+
+// GetQueueSize returns the current retry queue size.
+func (r *RetryManager) GetQueueSize() int {
+ r.retryMutex.RLock()
+ defer r.retryMutex.RUnlock()
+ return len(r.retryQueue)
+}
+
+// ProcessRetryQueue processes nodes that are ready for retry.
+func (r *RetryManager) ProcessRetryQueue(ctx context.Context) {
+ now := time.Now()
+
+ r.retryMutex.RLock()
+ nodesToRetry := make([]*RetryState, 0)
+ for _, retryState := range r.retryQueue {
+ if retryState.nextRetryTime.Before(now) ||
retryState.nextRetryTime.Equal(now) {
+ nodesToRetry = append(nodesToRetry, retryState)
+ }
+ }
+ r.retryMutex.RUnlock()
+
+ if len(nodesToRetry) == 0 {
+ return
+ }
+
+ r.log.Debug().Int("count", len(nodesToRetry)).Msg("Processing retry
queue")
+
+ for _, retryState := range nodesToRetry {
+ r.attemptNodeRetry(ctx, retryState)
+ }
+}
+
+func (r *RetryManager) attemptNodeRetry(ctx context.Context, retryState
*RetryState) {
+ addr := retryState.address
+
+ r.log.Debug().
+ Str("address", addr).
+ Int("attempt", retryState.attemptCount).
+ Dur("backoff", retryState.currentBackoff).
+ Msg("Attempting node metdata fetch retry")
+
+ // try to fetch metadata
+ node, fetchErr := r.fetcher.FetchNodeWithRetry(ctx, addr)
+
+ if fetchErr != nil {
+ // retry failed - update backoff
+ retryState.attemptCount++
+ retryState.lastError = fetchErr
+
+ // calculate next backoff interval
+ nextBackoff := time.Duration(float64(retryState.currentBackoff)
* r.config.Multiplier)
+ if nextBackoff > r.config.MaxInterval {
+ nextBackoff = r.config.MaxInterval
+ }
+ retryState.currentBackoff = nextBackoff
+ retryState.nextRetryTime = time.Now().Add(nextBackoff)
+
+ r.log.Warn().
+ Err(fetchErr).
+ Str("address", addr).
+ Int("attempt", retryState.attemptCount).
+ Dur("next_backoff", nextBackoff).
+ Time("next_retry", retryState.nextRetryTime).
+ Msg("Node metadata fetch retry failed, scheduling next
retry")
+
+ r.retryMutex.Lock()
+ r.retryQueue[addr] = retryState
+ r.retryMutex.Unlock()
+
+ if r.metrics != nil {
+ r.metrics.IncRetryFailed()
+ }
+ return
+ }
+
+ // success - add to cache
+ r.cacheBase.AddNode(addr, node)
+
+ // remove from retry queue
+ r.retryMutex.Lock()
+ delete(r.retryQueue, addr)
+ queueSize := len(r.retryQueue)
+ r.retryMutex.Unlock()
+
+ // notify handlers
+ r.cacheBase.NotifyHandlers(schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ Name: node.GetMetadata().GetName(),
+ },
+ Spec: node,
+ }, true)
+
+ r.log.Info().
+ Str("address", addr).
+ Str("name", node.GetMetadata().GetName()).
+ Int("total_attempts", retryState.attemptCount).
+ Msg("Node metadata fetched successfully after retry")
+
+ // update metrics
+ if r.metrics != nil {
+ r.metrics.IncRetrySuccess()
+ r.metrics.SetQueueSize(float64(queueSize))
+ }
+
+ // call success callback if set
+ if r.onSuccess != nil {
+ r.onSuccess(addr, node)
+ }
+
+ // call added to node callback if set
+ if r.onAddedToNode != nil {
+ r.onAddedToNode(addr)
+ }
+}
+
+// CleanupRetryQueue removes addresses from retry queue that are not in the
provided set.
+func (r *RetryManager) CleanupRetryQueue(validAddresses map[string]bool) {
+ r.retryMutex.Lock()
+ defer r.retryMutex.Unlock()
+
+ for addr := range r.retryQueue {
+ if !validAddresses[addr] {
+ delete(r.retryQueue, addr)
+ r.log.Debug().
+ Str("address", addr).
+ Msg("Removed node from retry queue (no longer
valid)")
+ }
+ }
+
+ if r.metrics != nil {
+ r.metrics.SetQueueSize(float64(len(r.retryQueue)))
+ }
+}
diff --git a/banyand/metadata/dns/dns.go b/banyand/metadata/discovery/dns/dns.go
similarity index 70%
rename from banyand/metadata/dns/dns.go
rename to banyand/metadata/discovery/dns/dns.go
index 2664f5df..fe472f04 100644
--- a/banyand/metadata/dns/dns.go
+++ b/banyand/metadata/discovery/dns/dns.go
@@ -31,25 +31,23 @@ import (
"google.golang.org/grpc/credentials/insecure"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
- "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
)
// Service implements DNS-based node discovery.
type Service struct {
+ *common.DiscoveryServiceBase
lastQueryTime time.Time
resolver Resolver
pathToReloader map[string]*pkgtls.Reloader
srvAddrToPath map[string]string
- nodeCache map[string]*databasev1.Node
+ addressToSrvAddr map[string]string
closer *run.Closer
- log *logger.Logger
metrics *metrics
- handlers []schema.EventHandler
lastSuccessfulDNS map[string][]string
srvAddresses []string
caCertPaths []string
@@ -57,21 +55,23 @@ type Service struct {
initInterval time.Duration
initDuration time.Duration
grpcTimeout time.Duration
- cacheMutex sync.RWMutex
- handlersMutex sync.RWMutex
lastQueryMutex sync.RWMutex
+ addressMutex sync.RWMutex
tlsEnabled bool
}
// Config holds configuration for DNS discovery service.
type Config struct {
- CACertPaths []string
- SRVAddresses []string
- InitInterval time.Duration
- InitDuration time.Duration
- PollInterval time.Duration
- GRPCTimeout time.Duration
- TLSEnabled bool
+ CACertPaths []string
+ SRVAddresses []string
+ InitInterval time.Duration
+ InitDuration time.Duration
+ PollInterval time.Duration
+ GRPCTimeout time.Duration
+ TLSEnabled bool
+ RetryInitialInterval time.Duration
+ RetryMaxInterval time.Duration
+ RetryMultiplier float64
}
// Resolver defines the interface for DNS SRV lookups.
@@ -109,16 +109,25 @@ func NewService(cfg Config) (*Service, error) {
grpcTimeout: cfg.GRPCTimeout,
tlsEnabled: cfg.TLSEnabled,
caCertPaths: cfg.CACertPaths,
- nodeCache: make(map[string]*databasev1.Node),
- handlers: make([]schema.EventHandler, 0),
lastSuccessfulDNS: make(map[string][]string),
pathToReloader: make(map[string]*pkgtls.Reloader),
srvAddrToPath: make(map[string]string),
- closer: run.NewCloser(1),
- log: logger.GetLogger("metadata-discovery-dns"),
+ addressToSrvAddr: make(map[string]string),
+ closer: run.NewCloser(2),
resolver: &defaultResolver{},
}
+ // initialize discovery service base (cache + retry manager)
+ svc.DiscoveryServiceBase = common.NewServiceBase(
+ "metadata-discovery-dns",
+ svc,
+ common.RetryConfig{
+ InitialInterval: cfg.RetryInitialInterval,
+ MaxInterval: cfg.RetryMaxInterval,
+ Multiplier: cfg.RetryMultiplier,
+ },
+ )
+
// create shared reloaders for CA certificates
if svc.tlsEnabled {
for srvIdx, certPath := range cfg.CACertPaths {
@@ -127,13 +136,13 @@ func NewService(cfg Config) (*Service, error) {
// check if we already have a Reloader for this path
if _, exists := svc.pathToReloader[certPath]; exists {
- svc.log.Debug().Str("certPath",
certPath).Int("srvIndex", srvIdx).
+ svc.GetLogger().Debug().Str("certPath",
certPath).Int("srvIndex", srvIdx).
Msg("Reusing existing CA certificate
reloader")
continue
}
// create new Reloader for this unique path
- reloader, reloaderErr :=
pkgtls.NewClientCertReloader(certPath, svc.log)
+ reloader, reloaderErr :=
pkgtls.NewClientCertReloader(certPath, svc.GetLogger())
if reloaderErr != nil {
// clean up any already-created reloaders
for _, r := range svc.pathToReloader {
@@ -144,7 +153,7 @@ func NewService(cfg Config) (*Service, error) {
}
svc.pathToReloader[certPath] = reloader
- svc.log.Info().Str("certPath",
certPath).Int("srvIndex", srvIdx).
+ svc.GetLogger().Info().Str("certPath",
certPath).Int("srvIndex", srvIdx).
Str("srvAddress", srvAddr).Msg("Initialized DNS
CA certificate reloader")
}
}
@@ -162,6 +171,60 @@ func newServiceWithResolver(cfg Config, resolver Resolver)
(*Service, error) {
return svc, nil
}
+// GetDialOptions implements GRPCDialOptionsProvider for DNS-specific TLS
setup.
+func (s *Service) GetDialOptions(address string) ([]grpc.DialOption, error) {
+ if !s.tlsEnabled {
+ return
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+ }
+
+ // find which SRV address this node address belongs to
+ s.addressMutex.RLock()
+ srvAddr, found := s.addressToSrvAddr[address]
+ s.addressMutex.RUnlock()
+
+ if !found {
+ // fallback: use any available SRV for backward compatibility
+ for srv := range s.srvAddrToPath {
+ srvAddr = srv
+ break
+ }
+ if srvAddr == "" {
+ return nil, fmt.Errorf("no SRV address mapping found
for %s", address)
+ }
+ }
+
+ // look up which Reloader to use for this address
+ if len(s.pathToReloader) > 0 {
+ // look up the cert path for this SRV address
+ certPath, pathExists := s.srvAddrToPath[srvAddr]
+ if !pathExists {
+ return nil, fmt.Errorf("no cert path found for SRV %s
(address %s)", srvAddr, address)
+ }
+
+ // get the Reloader for this cert path
+ reloader, reloaderExists := s.pathToReloader[certPath]
+ if !reloaderExists {
+ return nil, fmt.Errorf("no reloader found for cert path
%s (address %s)", certPath, address)
+ }
+
+ // get fresh TLS config from the Reloader
+ tlsConfig, configErr := reloader.GetClientTLSConfig("")
+ if configErr != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader for address %s: %w", address, configErr)
+ }
+
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // fallback to static TLS config (when no reloaders configured)
+ opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, "")
+ if err != nil {
+ return nil, fmt.Errorf("failed to load TLS config: %w", err)
+ }
+ return opts, nil
+}
+
func (s *Service) getTLSDialOptions(srvAddr, address string)
([]grpc.DialOption, error) {
if !s.tlsEnabled {
return
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
@@ -201,7 +264,7 @@ func (s *Service) getTLSDialOptions(srvAddr, address
string) ([]grpc.DialOption,
// Start begins the DNS discovery background process.
func (s *Service) Start(ctx context.Context) error {
- s.log.Debug().Msg("Starting DNS-based node discovery service")
+ s.GetLogger().Debug().Msg("Starting DNS-based node discovery service")
// start all Reloaders
if len(s.pathToReloader) > 0 {
@@ -216,22 +279,41 @@ func (s *Service) Start(ctx context.Context) error {
return fmt.Errorf("failed to start CA
certificate reloader for path %s: %w", certPath, startErr)
}
startedReloaders = append(startedReloaders, reloader)
- s.log.Debug().Str("certPath", certPath).Msg("Started CA
certificate reloader")
+ s.GetLogger().Debug().Str("certPath",
certPath).Msg("Started CA certificate reloader")
}
}
go s.discoveryLoop(ctx)
+ go s.retryScheduler(ctx)
return nil
}
+func (s *Service) retryScheduler(ctx context.Context) {
+ // check retry queue every second
+ checkTicker := time.NewTicker(1 * time.Second)
+ defer checkTicker.Stop()
+
+ for {
+ select {
+ case <-checkTicker.C:
+ s.RetryManager.ProcessRetryQueue(ctx)
+
+ case <-s.closer.CloseNotify():
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
func (s *Service) discoveryLoop(ctx context.Context) {
// add the init phase finish time
initPhaseEnd := time.Now().Add(s.initDuration)
for {
if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
- s.log.Err(err).Msg("failed to query DNS and update
nodes")
+ s.GetLogger().Err(err).Msg("failed to query DNS and
update nodes")
}
// wait for next interval
@@ -281,13 +363,13 @@ func (s *Service) queryDNSAndUpdateNodes(ctx
context.Context) error {
for srv, err := range srvToErrors {
if cachedAddrs, exists := s.lastSuccessfulDNS[srv]; exists {
finalAddresses[srv] = cachedAddrs
- s.log.Warn().
+ s.GetLogger().Warn().
Str("srv", srv).
Err(err).
Strs("cached_addresses", cachedAddrs).
Msg("Using cached addresses for failed SRV
query")
} else {
- s.log.Warn().
+ s.GetLogger().Warn().
Str("srv", srv).
Err(err).
Msg("SRV query failed and no cached addresses
available")
@@ -308,12 +390,12 @@ func (s *Service) queryDNSAndUpdateNodes(ctx
context.Context) error {
return fmt.Errorf("all DNS queries failed and no cached
addresses available: %w", errors.Join(allErrors...))
}
- if s.log.Debug().Enabled() {
+ if s.GetLogger().Debug().Enabled() {
totalAddrs := 0
for _, addrs := range finalAddresses {
totalAddrs += len(addrs)
}
- s.log.Debug().
+ s.GetLogger().Debug().
Int("total_addresses", totalAddrs).
Int("successful_srvs", len(srvToAddresses)).
Int("failed_srvs", len(srvToErrors)).
@@ -380,87 +462,74 @@ func (s *Service) queryAllSRVRecords(ctx context.Context)
(map[string][]string,
func (s *Service) updateNodeCache(ctx context.Context, srvToAddresses
map[string][]string) error {
var addErrors []error
+ s.addressMutex.Lock()
for srvAddr, addrs := range srvToAddresses {
for _, addr := range addrs {
- s.cacheMutex.RLock()
- _, exists := s.nodeCache[addr]
- s.cacheMutex.RUnlock()
+ s.addressToSrvAddr[addr] = srvAddr
+ }
+ }
+ s.addressMutex.Unlock()
+
+ // process new nodes
+ for _, addrs := range srvToAddresses {
+ for _, addr := range addrs {
+ _, exists := s.GetCachedNode(addr)
if !exists {
+ if s.RetryManager.IsInRetry(addr) {
+ continue
+ }
+
// fetch node metadata from gRPC
- node, fetchErr := s.fetchNodeMetadata(ctx,
srvAddr, addr)
+ node, fetchErr := s.fetchNodeMetadata(ctx, addr)
if fetchErr != nil {
- s.log.Warn().
+ s.GetLogger().Warn().
Err(fetchErr).
Str("address", addr).
Msg("Failed to fetch node
metadata")
addErrors = append(addErrors, fetchErr)
+ s.RetryManager.AddToRetry(addr,
fetchErr)
continue
}
- s.cacheMutex.Lock()
- if _, alreadyAdded := s.nodeCache[addr];
!alreadyAdded {
- s.nodeCache[addr] = node
-
- // notify handlers after releasing lock
- s.notifyHandlers(schema.Metadata{
- TypeMeta: schema.TypeMeta{
- Kind: schema.KindNode,
- Name:
node.GetMetadata().GetName(),
- },
- Spec: node,
- }, true)
-
- s.log.Debug().
- Str("address", addr).
- Str("name",
node.GetMetadata().GetName()).
- Msg("New node discovered and
added to cache")
- }
- s.cacheMutex.Unlock()
+ s.AddNodeAndNotify(addr, node, "New node
discovered and added to cache")
}
}
}
- // collect nodes to delete first
+ // collect addresses to keep
allAddr := make(map[string]bool)
for _, addrs := range srvToAddresses {
for _, addr := range addrs {
allAddr[addr] = true
}
}
- s.cacheMutex.Lock()
- nodesToDelete := make(map[string]*databasev1.Node)
- for addr, node := range s.nodeCache {
+
+ // find nodes to delete
+ currentAddresses := s.GetAllNodeAddresses()
+ addressesToDelete := make([]string, 0)
+ for _, addr := range currentAddresses {
if !allAddr[addr] {
- nodesToDelete[addr] = node
+ addressesToDelete = append(addressesToDelete, addr)
}
}
- // delete from cache while still holding lock
- for addr, node := range nodesToDelete {
- delete(s.nodeCache, addr)
- s.log.Debug().
- Str("address", addr).
- Str("name", node.GetMetadata().GetName()).
- Msg("Node removed from cache (no longer in DNS)")
- }
- cacheSize := len(s.nodeCache)
- s.cacheMutex.Unlock()
+ // delete nodes and notify handlers
+ s.RemoveNodesAndNotify(addressesToDelete, "Node removed from cache (no
longer in DNS)")
- // Notify handlers after releasing lock
- for _, node := range nodesToDelete {
- s.notifyHandlers(schema.Metadata{
- TypeMeta: schema.TypeMeta{
- Kind: schema.KindNode,
- Name: node.GetMetadata().GetName(),
- },
- Spec: node,
- }, false)
+ // clean up address to SRV mapping for deleted nodes
+ s.addressMutex.Lock()
+ for _, addr := range addressesToDelete {
+ delete(s.addressToSrvAddr, addr)
}
+ s.addressMutex.Unlock()
+
+ // clean up retry queue for removed nodes
+ s.RetryManager.CleanupRetryQueue(allAddr)
// update total nodes metric
if s.metrics != nil {
- s.metrics.totalNodesCount.Set(float64(cacheSize))
+ s.metrics.totalNodesCount.Set(float64(s.GetCacheSize()))
}
if len(addErrors) > 0 {
@@ -470,7 +539,12 @@ func (s *Service) updateNodeCache(ctx context.Context,
srvToAddresses map[string
return nil
}
-func (s *Service) fetchNodeMetadata(ctx context.Context, srvAddr, address
string) (*databasev1.Node, error) {
+// FetchNodeWithRetry implements NodeFetcher interface for retry manager.
+func (s *Service) FetchNodeWithRetry(ctx context.Context, address string)
(*databasev1.Node, error) {
+ return s.fetchNodeMetadata(ctx, address)
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, address string)
(*databasev1.Node, error) {
// record gRPC query metrics
startTime := time.Now()
var grpcErr error
@@ -486,70 +560,31 @@ func (s *Service) fetchNodeMetadata(ctx context.Context,
srvAddr, address string
}
}()
- ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
- defer cancel()
-
- // for TLS connections with other nodes to getting metadata
- dialOpts, err := s.getTLSDialOptions(srvAddr, address)
+ // use common fetcher
+ node, err := common.FetchNodeMetadata(ctx, address, s.grpcTimeout, s)
if err != nil {
- grpcErr = fmt.Errorf("failed to get TLS dial options: %w", err)
- return nil, grpcErr
- }
- // nolint:contextcheck
- conn, connErr := grpchelper.Conn(address, s.grpcTimeout, dialOpts...)
- if connErr != nil {
- grpcErr = fmt.Errorf("failed to connect to %s: %w", address,
connErr)
- return nil, grpcErr
- }
- defer conn.Close()
-
- // query metadata of the node
- client := databasev1.NewNodeQueryServiceClient(conn)
- resp, callErr := client.GetCurrentNode(ctxTimeout,
&databasev1.GetCurrentNodeRequest{})
- if callErr != nil {
- grpcErr = fmt.Errorf("failed to get current node from %s: %w",
address, callErr)
- return nil, grpcErr
- }
-
- return resp.GetNode(), nil
-}
-
-func (s *Service) notifyHandlers(metadata schema.Metadata, isAddOrUpdate bool)
{
- s.handlersMutex.RLock()
- defer s.handlersMutex.RUnlock()
-
- for _, handler := range s.handlers {
- if isAddOrUpdate {
- handler.OnAddOrUpdate(metadata)
- } else {
- handler.OnDelete(metadata)
- }
+ grpcErr = err
+ return nil, err
}
-}
-
-// RegisterHandler registers an event handler for node changes.
-func (s *Service) RegisterHandler(name string, handler schema.EventHandler) {
- s.handlersMutex.Lock()
- defer s.handlersMutex.Unlock()
-
- s.handlers = append(s.handlers, handler)
- s.log.Debug().Str("handler", name).Msg("Registered DNS node discovery
handler")
+ return node, nil
}
// SetMetrics set the OMR metrics.
func (s *Service) SetMetrics(factory observability.Factory) {
s.metrics = newMetrics(factory)
+ s.DiscoveryServiceBase.SetMetrics(s.metrics)
}
// Close stops the DNS discovery service.
func (s *Service) Close() error {
- s.closer.Done()
+ s.closer.Done() // for discoveryLoop
+ s.closer.Done() // for retryScheduler
s.closer.CloseThenWait()
// stop all Reloaders
for certPath, reloader := range s.pathToReloader {
reloader.Stop()
- s.log.Debug().Str("certPath", certPath).Msg("Stopped CA
certificate reloader")
+ s.GetLogger().Debug().Str("certPath", certPath).Msg("Stopped CA
certificate reloader")
}
return nil
@@ -566,42 +601,8 @@ func (s *Service) ListNode(ctx context.Context, role
databasev1.Role) ([]*databa
return nil, err
}
}
- s.cacheMutex.RLock()
- defer s.cacheMutex.RUnlock()
-
- var result []*databasev1.Node
- for _, node := range s.nodeCache {
- // filter by role if specified
- if role != databasev1.Role_ROLE_UNSPECIFIED {
- hasRole := false
- for _, nodeRole := range node.GetRoles() {
- if nodeRole == role {
- hasRole = true
- break
- }
- }
- if !hasRole {
- continue
- }
- }
- result = append(result, node)
- }
-
- return result, nil
-}
-
-// GetNode current node from cache.
-func (s *Service) GetNode(_ context.Context, nodeName string)
(*databasev1.Node, error) {
- s.cacheMutex.RLock()
- defer s.cacheMutex.RUnlock()
-
- for _, node := range s.nodeCache {
- if node.GetMetadata() != nil && node.GetMetadata().GetName() ==
nodeName {
- return node, nil
- }
- }
-
- return nil, fmt.Errorf("node %s not found", nodeName)
+ // delegate to base for filtering
+ return s.NodeCacheBase.ListNode(ctx, role)
}
// RegisterNode update the configurations of a node, it should not be invoked.
diff --git a/banyand/metadata/dns/dns_test.go
b/banyand/metadata/discovery/dns/dns_test.go
similarity index 99%
rename from banyand/metadata/dns/dns_test.go
rename to banyand/metadata/discovery/dns/dns_test.go
index 818df974..5467002f 100644
--- a/banyand/metadata/dns/dns_test.go
+++ b/banyand/metadata/discovery/dns/dns_test.go
@@ -35,7 +35,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata/dns"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -516,10 +516,9 @@ var _ = Describe("DNS Discovery Service", func() {
// Call queryDNSAndUpdateNodes again
// One DNS fails, so it should fallback to cached
addresses (all 4 nodes)
+ // With retry mechanism, nodes already in retry queue
are skipped, so no errors returned
queryErr2 := svc.QueryDNSAndUpdateNodes(ctx)
- Expect(queryErr2).To(HaveOccurred())
- // Verify the gRPC error is expected (trying to connect
to non-existent servers)
- Expect(queryErr2.Error()).To(ContainSubstring("failed
to connect"))
+ Expect(queryErr2).NotTo(HaveOccurred())
// Verify fallback happened - cache still has all 4
nodes from first success
cachedAfterFailure := svc.GetLastSuccessfulDNS()
diff --git a/banyand/metadata/dns/export_test.go
b/banyand/metadata/discovery/dns/export_test.go
similarity index 92%
rename from banyand/metadata/dns/export_test.go
rename to banyand/metadata/discovery/dns/export_test.go
index a28b4cf7..fd1b2564 100644
--- a/banyand/metadata/dns/export_test.go
+++ b/banyand/metadata/discovery/dns/export_test.go
@@ -63,12 +63,12 @@ func (s *Service) GetReloaderCount() int {
// GetNodeCache returns the current node cache for testing.
func (s *Service) GetNodeCache() map[string]*databasev1.Node {
- s.cacheMutex.RLock()
- defer s.cacheMutex.RUnlock()
-
- result := make(map[string]*databasev1.Node, len(s.nodeCache))
- for k, v := range s.nodeCache {
- result[k] = v
+ result := make(map[string]*databasev1.Node)
+ addresses := s.GetAllNodeAddresses()
+ for _, addr := range addresses {
+ if node, exists := s.GetCachedNode(addr); exists {
+ result[addr] = node
+ }
}
return result
}
diff --git a/banyand/metadata/dns/metrics.go
b/banyand/metadata/discovery/dns/metrics.go
similarity index 75%
rename from banyand/metadata/dns/metrics.go
rename to banyand/metadata/discovery/dns/metrics.go
index 87038580..04e0b0ca 100644
--- a/banyand/metadata/dns/metrics.go
+++ b/banyand/metadata/discovery/dns/metrics.go
@@ -44,6 +44,12 @@ type metrics struct {
discoveryDuration meter.Histogram
totalNodesCount meter.Gauge
+
+ // Retry-related metrics
+ nodeRetryCount meter.Counter
+ nodeRetryFailedCount meter.Counter
+ nodeRetrySuccessCount meter.Counter
+ retryQueueSize meter.Gauge
}
// newMetrics creates a new metrics instance.
@@ -70,5 +76,31 @@ func newMetrics(factory observability.Factory) *metrics {
discoveryDuration:
factory.NewHistogram("discovery_duration", meter.DefBuckets),
totalNodesCount: factory.NewGauge("total_nodes_count"),
+
+ // Retry-related metrics
+ nodeRetryCount: factory.NewCounter("node_retry_count"),
+ nodeRetryFailedCount:
factory.NewCounter("node_retry_failed_count"),
+ nodeRetrySuccessCount:
factory.NewCounter("node_retry_success_count"),
+ retryQueueSize: factory.NewGauge("retry_queue_size"),
}
}
+
+// IncRetryCount implements RetryMetrics interface.
+func (m *metrics) IncRetryCount() {
+ m.nodeRetryCount.Inc(1)
+}
+
+// IncRetrySuccess implements RetryMetrics interface.
+func (m *metrics) IncRetrySuccess() {
+ m.nodeRetrySuccessCount.Inc(1)
+}
+
+// IncRetryFailed implements RetryMetrics interface.
+func (m *metrics) IncRetryFailed() {
+ m.nodeRetryFailedCount.Inc(1)
+}
+
+// SetQueueSize implements RetryMetrics interface.
+func (m *metrics) SetQueueSize(size float64) {
+ m.retryQueueSize.Set(size)
+}
diff --git a/banyand/metadata/dns/testdata/ca_cert.pem
b/banyand/metadata/discovery/dns/testdata/ca_cert.pem
similarity index 100%
rename from banyand/metadata/dns/testdata/ca_cert.pem
rename to banyand/metadata/discovery/dns/testdata/ca_cert.pem
diff --git a/banyand/metadata/dns/testdata/server_key.pem
b/banyand/metadata/discovery/dns/testdata/server_key.pem
similarity index 100%
rename from banyand/metadata/dns/testdata/server_key.pem
rename to banyand/metadata/discovery/dns/testdata/server_key.pem
diff --git a/banyand/metadata/discovery/file/file.go
b/banyand/metadata/discovery/file/file.go
new file mode 100644
index 00000000..7938d466
--- /dev/null
+++ b/banyand/metadata/discovery/file/file.go
@@ -0,0 +1,466 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file implements file-based node discovery for distributed metadata
management.
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/fsnotify/fsnotify"
+ "google.golang.org/grpc"
+ "gopkg.in/yaml.v3"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// Service implements file-based node discovery.
+type Service struct {
+ *common.DiscoveryServiceBase
+ addressToNodeConfig map[string]NodeConfig
+ closer *run.Closer
+ metrics *metrics
+ watcher *fsnotify.Watcher
+ filePath string
+ grpcTimeout time.Duration
+ fetchInterval time.Duration
+ reloadMutex sync.Mutex
+ configMutex sync.RWMutex
+ started bool
+}
+
+// Config holds configuration for file discovery service.
+type Config struct {
+ FilePath string
+ GRPCTimeout time.Duration
+ FetchInterval time.Duration
+ RetryInitialInterval time.Duration
+ RetryMaxInterval time.Duration
+ RetryMultiplier float64
+}
+
+// NodeFileConfig represents the YAML configuration file structure.
+type NodeFileConfig struct {
+ Nodes []NodeConfig `yaml:"nodes"`
+}
+
+// NodeConfig represents a single node configuration.
+type NodeConfig struct {
+ Name string `yaml:"name"`
+ Address string `yaml:"grpc_address"`
+ CACertPath string `yaml:"ca_cert_path"`
+ TLSEnabled bool `yaml:"tls_enabled"`
+}
+
+// NewService creates a new file discovery service.
+func NewService(cfg Config) (*Service, error) {
+ if cfg.FilePath == "" {
+ return nil, errors.New("file path cannot be empty")
+ }
+
+ // validate file exists and is readable
+ if _, err := os.Stat(cfg.FilePath); err != nil {
+ return nil, fmt.Errorf("failed to access file path %s: %w",
cfg.FilePath, err)
+ }
+
+ // validate retry config
+ if cfg.RetryMultiplier < 1.0 {
+ return nil, fmt.Errorf("retry multiplier must be >= 1.0, got
%f", cfg.RetryMultiplier)
+ }
+ if cfg.RetryMaxInterval < cfg.RetryInitialInterval {
+ return nil, fmt.Errorf("retry max interval (%v) must be >=
initial interval (%v)",
+ cfg.RetryMaxInterval, cfg.RetryInitialInterval)
+ }
+ if cfg.RetryInitialInterval <= 0 {
+ return nil, errors.New("retry initial interval must be
positive")
+ }
+
+ // create fsnotify watcher
+ fileWatcher, watchErr := fsnotify.NewWatcher()
+ if watchErr != nil {
+ return nil, fmt.Errorf("failed to create fsnotify watcher: %w",
watchErr)
+ }
+
+ svc := &Service{
+ filePath: cfg.FilePath,
+ addressToNodeConfig: make(map[string]NodeConfig),
+ closer: run.NewCloser(3),
+ grpcTimeout: cfg.GRPCTimeout,
+ fetchInterval: cfg.FetchInterval,
+ watcher: fileWatcher,
+ }
+
+ // initialize discovery service base
+ svc.DiscoveryServiceBase = common.NewServiceBase(
+ "metadata-discovery-file",
+ svc,
+ common.RetryConfig{
+ InitialInterval: cfg.RetryInitialInterval,
+ MaxInterval: cfg.RetryMaxInterval,
+ Multiplier: cfg.RetryMultiplier,
+ },
+ )
+
+ // set callbacks for retry manager
+ svc.RetryManager.SetSuccessCallback(func(_ string, _ *databasev1.Node) {
+ if svc.metrics != nil {
+ svc.metrics.nodeConnectedCount.Inc(1)
+ }
+ })
+
+ return svc, nil
+}
+
+// GetDialOptions implements GRPCDialOptionsProvider for file-specific
per-node TLS.
+func (s *Service) GetDialOptions(address string) ([]grpc.DialOption, error) {
+ s.configMutex.RLock()
+ nodeConfig, exists := s.addressToNodeConfig[address]
+ s.configMutex.RUnlock()
+
+ if !exists {
+ return nil, fmt.Errorf("no node configuration found for address
%s", address)
+ }
+
+ return grpchelper.SecureOptions(nil, nodeConfig.TLSEnabled, false,
nodeConfig.CACertPath)
+}
+
+// Start begins the file discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+ s.GetLogger().Debug().Str("file_path", s.filePath).Msg("Starting
file-based node discovery service")
+
+ // initial load
+ if err := s.loadAndParseFile(ctx); err != nil {
+ return fmt.Errorf("failed to load initial configuration: %w",
err)
+ }
+
+ // start fsnotify watcher
+ if err := s.watcher.Add(s.filePath); err != nil {
+ return fmt.Errorf("failed to add file to watcher: %w", err)
+ }
+
+ // mark as started
+ s.started = true
+
+ // start goroutines
+ go s.watchFileChanges(ctx)
+ go s.periodicFetch(ctx)
+ go s.retryScheduler(ctx)
+
+ return nil
+}
+
+func (s *Service) loadAndParseFile(ctx context.Context) error {
+ s.reloadMutex.Lock()
+ defer s.reloadMutex.Unlock()
+ startTime := time.Now()
+ var parseErr error
+ defer func() {
+ if s.metrics != nil {
+ duration := time.Since(startTime)
+ s.metrics.fileLoadCount.Inc(1)
+ s.metrics.fileLoadDuration.Observe(duration.Seconds())
+ if parseErr != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+ }
+ }()
+
+ data, err := os.ReadFile(s.filePath)
+ if err != nil {
+ parseErr = fmt.Errorf("failed to read file: %w", err)
+ return parseErr
+ }
+
+ var cfg NodeFileConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ return fmt.Errorf("failed to parse YAML: %w", err)
+ }
+
+ // validate required fields
+ for idx, node := range cfg.Nodes {
+ if node.Address == "" {
+ parseErr = fmt.Errorf("node %s at index %d is missing
required field: grpc_address", node.Name, idx)
+ return parseErr
+ }
+ if node.TLSEnabled && node.CACertPath == "" {
+ parseErr = fmt.Errorf("node %s at index %d has TLS
enabled but missing ca_cert_path", node.Name, idx)
+ return parseErr
+ }
+ }
+
+ // update cache
+ s.updateNodeCache(ctx, cfg.Nodes)
+
+ s.GetLogger().Debug().Int("node_count",
len(cfg.Nodes)).Msg("Successfully loaded configuration file")
+ return nil
+}
+
+// FetchNodeWithRetry implements NodeFetcher interface for retry manager.
+func (s *Service) FetchNodeWithRetry(ctx context.Context, address string)
(*databasev1.Node, error) {
+ // use common fetcher with address
+ return common.FetchNodeMetadata(ctx, address, s.grpcTimeout, s)
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, nodeConfig
NodeConfig) (*databasev1.Node, error) {
+ node, err := common.FetchNodeMetadata(ctx, nodeConfig.Address,
s.grpcTimeout, s)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch metadata for node %s:
%w", nodeConfig.Name, err)
+ }
+
+ return node, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, newNodes []NodeConfig) {
+ newAddressToNodeConfig := make(map[string]NodeConfig)
+ for _, nodeConfig := range newNodes {
+ newAddressToNodeConfig[nodeConfig.Address] = nodeConfig
+ }
+
+ // save old config for comparison and replace with new one
+ s.configMutex.Lock()
+ oldAddressToNodeConfig := s.addressToNodeConfig
+ s.addressToNodeConfig = newAddressToNodeConfig
+ s.configMutex.Unlock()
+
+ // process new or updated nodes
+ for _, nodeConfig := range newNodes {
+ _, existsInCache := s.GetCachedNode(nodeConfig.Address)
+ // node already connected, skip
+ if existsInCache {
+ continue
+ }
+
+ // node in retry queue - check if config changed
+ if s.RetryManager.IsInRetry(nodeConfig.Address) {
+ oldConfig, hasOldConfig :=
oldAddressToNodeConfig[nodeConfig.Address]
+
+ if hasOldConfig && nodeConfigChanged(oldConfig,
nodeConfig) {
+ s.GetLogger().Info().
+ Str("address", nodeConfig.Address).
+ Msg("Node configuration changed,
resetting retry state")
+
s.RetryManager.RemoveFromRetry(nodeConfig.Address)
+ } else {
+ // config unchanged, let retry scheduler handle
it
+ continue
+ }
+ }
+
+ // try to fetch metadata for new node
+ node, fetchErr := s.fetchNodeMetadata(ctx, nodeConfig)
+ if fetchErr != nil {
+ // add to retry queue
+ s.RetryManager.AddToRetry(nodeConfig.Address, fetchErr)
+ continue
+ }
+
+ // successfully fetched - add to cache
+ if s.AddNodeAndNotify(nodeConfig.Address, node, "Node
discovered and added to cache") {
+ if s.metrics != nil {
+ s.metrics.nodeConnectedCount.Inc(1)
+ }
+ }
+ }
+
+ // find nodes to delete
+ currentAddresses := s.GetAllNodeAddresses()
+ addressesToDelete := make([]string, 0)
+ for _, addr := range currentAddresses {
+ if _, inFile := newAddressToNodeConfig[addr]; !inFile {
+ addressesToDelete = append(addressesToDelete, addr)
+ }
+ }
+
+ // delete nodes and notify handlers
+ s.RemoveNodesAndNotify(addressesToDelete, "Node removed from cache (no
longer in file)")
+
+ // clean up retry queue for removed nodes
+ validAddresses := make(map[string]bool)
+ for addr := range newAddressToNodeConfig {
+ validAddresses[addr] = true
+ }
+ s.RetryManager.CleanupRetryQueue(validAddresses)
+
+ // update metrics
+ if s.metrics != nil {
+ s.metrics.totalNodesCount.Set(float64(s.GetCacheSize()))
+
s.metrics.retryQueueSize.Set(float64(s.RetryManager.GetQueueSize()))
+ }
+}
+
+// SetMetrics sets the OMR metrics.
+func (s *Service) SetMetrics(factory observability.Factory) {
+ s.metrics = newMetrics(factory)
+ s.DiscoveryServiceBase.SetMetrics(s.metrics)
+}
+
+// Close stops the file discovery service.
+func (s *Service) Close() error {
+ s.GetLogger().Debug().Msg("Closing file discovery service")
+
+ // close fsnotify watcher
+ if s.watcher != nil {
+ if err := s.watcher.Close(); err != nil {
+ s.GetLogger().Error().Err(err).Msg("Failed to close
fsnotify watcher")
+ }
+ }
+
+ // only wait for goroutines if Start() was called
+ if s.started {
+ s.closer.CloseThenWait()
+ }
+
+ return nil
+}
+
+// RegisterNode is not supported in file discovery mode.
+func (s *Service) RegisterNode(_ context.Context, _ *databasev1.Node, _ bool)
error {
+ return errors.New("manual node registration not supported in file
discovery mode")
+}
+
+// UpdateNode is not supported in file discovery mode.
+func (s *Service) UpdateNode(_ context.Context, _ *databasev1.Node) error {
+ return errors.New("manual node update not supported in file discovery
mode")
+}
+
+func (s *Service) watchFileChanges(ctx context.Context) {
+ defer s.closer.Done()
+
+ for {
+ select {
+ case event, ok := <-s.watcher.Events:
+ if !ok {
+ s.GetLogger().Info().Msg("fsnotify watcher
events channel closed")
+ return
+ }
+
+ s.GetLogger().Debug().
+ Str("file", event.Name).
+ Str("op", event.Op.String()).
+ Msg("Detected node metadata file changed event")
+
+ // handle relevant events (Write, Create, Remove,
Rename)
+ // generally same as tlsReloader
+ if
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename) != 0 {
+ // re-add file to watcher if it was
removed/renamed
+ if event.Op&(fsnotify.Remove|fsnotify.Rename)
!= 0 {
+ s.GetLogger().Info().Str("file",
event.Name).Msg("File removed/renamed, re-adding to watcher")
+ _ = s.watcher.Remove(s.filePath)
+
+ // wait briefly for file operations to
complete
+ time.Sleep(1 * time.Second)
+
+ // try to re-add with retry
+ maxRetries := 5
+ for attempt := 0; attempt < maxRetries;
attempt++ {
+ if addErr :=
s.watcher.Add(s.filePath); addErr == nil {
+
s.GetLogger().Debug().Str("file", s.filePath).Msg("Re-added file to watcher")
+ break
+ } else if attempt ==
maxRetries-1 {
+ s.GetLogger().Error().
+ Err(addErr).
+ Str("file",
s.filePath).
+ Msg("Failed to
re-add file to watcher after retries")
+ }
+ time.Sleep(500 *
time.Millisecond)
+ }
+ } else {
+ // wait for file stability
+ time.Sleep(200 * time.Millisecond)
+ }
+
+ // immediate reload
+ if reloadErr := s.loadAndParseFile(ctx);
reloadErr != nil {
+
s.GetLogger().Warn().Err(reloadErr).Msg("Failed to reload configuration file
(fsnotify)")
+ } else {
+ s.GetLogger().Info().Msg("Configuration
file reloaded successfully (fsnotify)")
+ }
+ }
+
+ case watchErr, ok := <-s.watcher.Errors:
+ if !ok {
+ s.GetLogger().Info().Msg("fsnotify watcher
errors channel closed")
+ return
+ }
+ s.GetLogger().Error().Err(watchErr).Msg("Error from
fsnotify watcher")
+ if s.metrics != nil {
+ s.metrics.fileLoadFailedCount.Inc(1)
+ }
+
+ case <-s.closer.CloseNotify():
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (s *Service) periodicFetch(ctx context.Context) {
+ defer s.closer.Done()
+
+ fetchTicker := time.NewTicker(s.fetchInterval)
+ defer fetchTicker.Stop()
+
+ for {
+ select {
+ case <-fetchTicker.C:
+ if err := s.loadAndParseFile(ctx); err != nil {
+ s.GetLogger().Warn().Err(err).Msg("Failed to
reload configuration file (periodic)")
+ } else {
+ s.GetLogger().Debug().Msg("Configuration file
reloaded successfully (periodic)")
+ }
+ case <-s.closer.CloseNotify():
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (s *Service) retryScheduler(ctx context.Context) {
+ defer s.closer.Done()
+
+ // check retry queue every second
+ checkTicker := time.NewTicker(1 * time.Second)
+ defer checkTicker.Stop()
+
+ for {
+ select {
+ case <-checkTicker.C:
+ s.RetryManager.ProcessRetryQueue(ctx)
+
+ case <-s.closer.CloseNotify():
+ return
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func nodeConfigChanged(older, newer NodeConfig) bool {
+ return older.Address != newer.Address ||
+ older.CACertPath != newer.CACertPath ||
+ older.TLSEnabled != newer.TLSEnabled
+}
diff --git a/banyand/metadata/discovery/file/file_test.go
b/banyand/metadata/discovery/file/file_test.go
new file mode 100644
index 00000000..8d0620b3
--- /dev/null
+++ b/banyand/metadata/discovery/file/file_test.go
@@ -0,0 +1,616 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+ testGRPCTimeout = 2 * time.Second
+ testFetchInterval = 200 * time.Millisecond
+ testRetryInitialInterval = 1 * time.Second
+ testRetryMaxInterval = 5 * time.Minute
+ testRetryMultiplier = 2.0
+)
+
+func testConfig(filePath string) Config {
+ return Config{
+ FilePath: filePath,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ RetryInitialInterval: testRetryInitialInterval,
+ RetryMaxInterval: testRetryMaxInterval,
+ RetryMultiplier: testRetryMultiplier,
+ }
+}
+
+func TestNewService(t *testing.T) {
+ t.Run("valid config", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: 127.0.0.1:17912
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ require.NotNil(t, svc)
+ require.NoError(t, svc.Close())
+ })
+
+ t.Run("empty file path", func(t *testing.T) {
+ _, err := NewService(Config{FilePath: ""})
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "file path cannot be empty")
+ })
+
+ t.Run("non-existent file", func(t *testing.T) {
+ _, err := NewService(testConfig("/not/exist"))
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to access file path")
+ })
+}
+
+func TestStartWithInvalidConfig(t *testing.T) {
+ ctx := context.Background()
+
+ t.Run("invalid yaml", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: [invalid
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to parse YAML")
+ })
+
+ t.Run("missing address", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing required field:
grpc_address")
+ })
+
+ t.Run("tls enabled without ca cert", func(t *testing.T) {
+ configFile := createTempConfigFile(t, `
+nodes:
+ - name: node1
+ grpc_address: 127.0.0.1:17912
+ tls_enabled: true
+`)
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "missing ca_cert_path")
+ })
+}
+
+func TestStartAndCacheNodes(t *testing.T) {
+ listener, grpcServer, nodeServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+
+ nodeName := "test-node"
+ serverAddr := listener.Addr().String()
+ nodeServer.setNode(newTestNode(nodeName, serverAddr))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: %s
+ grpc_address: %s
+`, nodeName, serverAddr))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ require.NoError(t, svc.Start(ctx))
+
+ nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+ require.NoError(t, listErr)
+ require.Len(t, nodes, 1)
+ assert.Equal(t, nodeName, nodes[0].GetMetadata().GetName())
+ assert.Equal(t, serverAddr, nodes[0].GetGrpcAddress())
+
+ nodeFromCache, getErr := svc.GetNode(ctx, nodeName)
+ require.NoError(t, getErr)
+ assert.Equal(t, nodeName, nodeFromCache.GetMetadata().GetName())
+}
+
+func TestHandlerNotifications(t *testing.T) {
+ listenerOne, grpcServerOne, nodeServerOne := startMockGRPCServer(t)
+ defer grpcServerOne.Stop()
+ defer listenerOne.Close()
+ addrOne := listenerOne.Addr().String()
+ nodeServerOne.setNode(newTestNode("node-one", addrOne))
+
+ listenerTwo, grpcServerTwo, nodeServerTwo := startMockGRPCServer(t)
+ defer grpcServerTwo.Stop()
+ defer listenerTwo.Close()
+ addrTwo := listenerTwo.Addr().String()
+ nodeServerTwo.setNode(newTestNode("node-two", addrTwo))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: node-one
+ grpc_address: %s
+`, addrOne))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var mu sync.Mutex
+ added := make([]string, 0)
+ deleted := make([]string, 0)
+ handler := &testEventHandler{
+ onAdd: func(metadata schema.Metadata) {
+ mu.Lock()
+ defer mu.Unlock()
+ added = append(added, metadata.Name)
+ },
+ onDelete: func(metadata schema.Metadata) {
+ mu.Lock()
+ defer mu.Unlock()
+ deleted = append(deleted, metadata.Name)
+ },
+ }
+ svc.RegisterHandler("test", handler)
+
+ require.NoError(t, svc.Start(ctx))
+
+ require.Eventually(t, func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return len(added) == 1 && added[0] == "node-one"
+ }, 3*time.Second, 50*time.Millisecond)
+
+ updateConfigFile(t, configFile, fmt.Sprintf(`
+nodes:
+ - name: node-two
+ grpc_address: %s
+`, addrTwo))
+
+ require.Eventually(t, func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return len(added) == 2 && added[1] == "node-two" &&
len(deleted) == 1 && deleted[0] == "node-one"
+ }, 5*time.Second, 50*time.Millisecond)
+
+ nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+ require.NoError(t, listErr)
+ require.Len(t, nodes, 1)
+ assert.Equal(t, "node-two", nodes[0].GetMetadata().GetName())
+}
+
+func TestListNodeRoleFilter(t *testing.T) {
+ listener, grpcServer, nodeServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+ addr := listener.Addr().String()
+ nodeServer.setNode(newTestNode("role-test-node", addr))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: role-test-node
+ grpc_address: %s
+`, addr))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ require.NoError(t, svc.Start(ctx))
+
+ nodes, listErr := svc.ListNode(ctx, databasev1.Role_ROLE_UNSPECIFIED)
+ require.NoError(t, listErr)
+ require.Len(t, nodes, 1)
+
+ nodes, listErr = svc.ListNode(ctx, databasev1.Role_ROLE_DATA)
+ require.NoError(t, listErr)
+ assert.Empty(t, nodes)
+}
+
+func TestGetNode(t *testing.T) {
+ listener, grpcServer, nodeServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+ addr := listener.Addr().String()
+ nodeServer.setNode(newTestNode("cached-node", addr))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: cached-node
+ grpc_address: %s
+`, addr))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ require.NoError(t, svc.Start(ctx))
+
+ node, getErr := svc.GetNode(ctx, "cached-node")
+ require.NoError(t, getErr)
+ assert.Equal(t, "cached-node", node.GetMetadata().GetName())
+
+ _, getErr = svc.GetNode(ctx, "non-existent")
+ require.Error(t, getErr)
+ assert.Contains(t, getErr.Error(), "not found")
+}
+
+func TestConcurrentAccess(t *testing.T) {
+ listener, grpcServer, nodeServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+ addr := listener.Addr().String()
+ nodeServer.setNode(newTestNode("concurrent-node", addr))
+
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: concurrent-node
+ grpc_address: %s
+`, addr))
+ defer os.Remove(configFile)
+
+ svc, err := NewService(testConfig(configFile))
+ require.NoError(t, err)
+ defer svc.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ require.NoError(t, svc.Start(ctx))
+
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for j := 0; j < 100; j++ {
+ if _, errList := svc.ListNode(ctx,
databasev1.Role_ROLE_UNSPECIFIED); errList != nil {
+ t.Errorf("list node: %v", errList)
+ }
+ if _, errGet := svc.GetNode(ctx,
"concurrent-node"); errGet != nil {
+ t.Errorf("get node: %v", errGet)
+ }
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+func createTempConfigFile(t *testing.T, content string) string {
+ t.Helper()
+ tmpDir := t.TempDir()
+ tmpFile := filepath.Join(tmpDir, "nodes.yaml")
+ require.NoError(t, os.WriteFile(tmpFile, []byte(content), 0o600))
+ return tmpFile
+}
+
+func updateConfigFile(t *testing.T, path string, content string) {
+ t.Helper()
+ require.NoError(t, os.WriteFile(path, []byte(content), 0o600))
+}
+
+func newTestNode(name, address string) *databasev1.Node {
+ return &databasev1.Node{
+ Metadata: &commonv1.Metadata{
+ Name: name,
+ },
+ GrpcAddress: address,
+ CreatedAt: timestamppb.Now(),
+ }
+}
+
+type testEventHandler struct {
+ onAdd func(metadata schema.Metadata)
+ onDelete func(metadata schema.Metadata)
+}
+
+func (h *testEventHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+ return false, nil
+}
+
+func (h *testEventHandler) OnAddOrUpdate(metadata schema.Metadata) {
+ if h.onAdd != nil {
+ h.onAdd(metadata)
+ }
+}
+
+func (h *testEventHandler) OnDelete(metadata schema.Metadata) {
+ if h.onDelete != nil {
+ h.onDelete(metadata)
+ }
+}
+
+type mockNodeQueryServer struct {
+ databasev1.UnimplementedNodeQueryServiceServer
+ node *databasev1.Node
+ mu sync.RWMutex
+}
+
+func (m *mockNodeQueryServer) GetCurrentNode(_ context.Context, _
*databasev1.GetCurrentNodeRequest) (*databasev1.GetCurrentNodeResponse, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ if m.node == nil {
+ return nil, fmt.Errorf("no node available")
+ }
+ return &databasev1.GetCurrentNodeResponse{Node: m.node}, nil
+}
+
+func (m *mockNodeQueryServer) setNode(node *databasev1.Node) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.node = node
+}
+
+func startMockGRPCServer(t *testing.T) (net.Listener, *grpc.Server,
*mockNodeQueryServer) {
+ t.Helper()
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ require.NoError(t, err)
+
+ mockServer := &mockNodeQueryServer{}
+ grpcServer := grpc.NewServer()
+ databasev1.RegisterNodeQueryServiceServer(grpcServer, mockServer)
+
+ healthServer := health.NewServer()
+ healthServer.SetServingStatus("",
grpc_health_v1.HealthCheckResponse_SERVING)
+ grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
+
+ go func() {
+ _ = grpcServer.Serve(listener)
+ }()
+
+ return listener, grpcServer, mockServer
+}
+
+func TestBackoffRetryMechanism(t *testing.T) {
+ ctx := context.Background()
+
+ // create a mock server that will fail initially
+ listener, grpcServer, mockServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+
+ // initially return error (node not available)
+ mockServer.setNode(nil)
+
+ // create config file with the failing node
+ address := listener.Addr().String()
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: retry-node
+ grpc_address: %s
+`, address))
+ defer os.Remove(configFile)
+
+ // use shorter intervals for testing
+ cfg := Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ RetryInitialInterval: 100 * time.Millisecond,
+ RetryMaxInterval: 1 * time.Second,
+ RetryMultiplier: 2.0,
+ }
+
+ svc, err := NewService(cfg)
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.NoError(t, err)
+
+ // verify node is in retry queue
+ time.Sleep(150 * time.Millisecond)
+ inRetry := svc.RetryManager.IsInRetry(address)
+ require.True(t, inRetry, "Node should be in retry queue")
+ require.Greater(t, svc.RetryManager.GetQueueSize(), 0, "Retry queue
should not be empty")
+
+ // verify node is not in cache yet
+ _, inCache := svc.GetCachedNode(address)
+ require.False(t, inCache, "Node should not be in cache yet")
+
+ // now make the node available
+ mockServer.setNode(newTestNode("retry-node", address))
+
+ // wait for retry to succeed
+ require.Eventually(t, func() bool {
+ _, exists := svc.GetCachedNode(address)
+ return exists
+ }, 3*time.Second, 50*time.Millisecond, "Node should eventually be added
to cache after retry")
+
+ // verify node is removed from retry queue
+ stillInRetry := svc.RetryManager.IsInRetry(address)
+ require.False(t, stillInRetry, "Node should be removed from retry queue
after success")
+}
+
+func TestBackoffResetOnConfigChange(t *testing.T) {
+ ctx := context.Background()
+
+ listener, grpcServer, mockServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+
+ mockServer.setNode(nil) // fail initially
+
+ address := listener.Addr().String()
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: reset-test-node
+ grpc_address: %s
+ tls_enabled: false
+`, address))
+ defer os.Remove(configFile)
+
+ cfg := Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ RetryInitialInterval: 100 * time.Millisecond,
+ RetryMaxInterval: 1 * time.Second,
+ RetryMultiplier: 2.0,
+ }
+
+ svc, err := NewService(cfg)
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.NoError(t, err)
+
+ // wait for node to enter retry queue
+ time.Sleep(150 * time.Millisecond)
+
+ inRetry := svc.RetryManager.IsInRetry(address)
+ require.True(t, inRetry, "Node should be in retry queue")
+ initialQueueSize := svc.RetryManager.GetQueueSize()
+ require.Greater(t, initialQueueSize, 0, "Retry queue should not be
empty")
+
+ // wait a bit for retries to occur
+ time.Sleep(250 * time.Millisecond)
+
+ // verify still in retry (since mock server still returns error)
+ stillInRetry := svc.RetryManager.IsInRetry(address)
+ require.True(t, stillInRetry, "Node should still be in retry queue")
+
+ // now change the config (enable TLS) - this should reset retry state
+ err = os.WriteFile(configFile, []byte(fmt.Sprintf(`
+nodes:
+ - name: reset-test-node-updated
+ grpc_address: %s
+ tls_enabled: true
+ ca_cert_path: /tmp/ca.crt
+`, address)), 0o600)
+ require.NoError(t, err)
+
+ // wait for file change detection and reload
+ // After config change, the node should be removed from retry and
re-tried immediately
+ // Since it still fails, it will go back into retry queue
+ time.Sleep(500 * time.Millisecond)
+
+ // verify still in retry queue (config changed, but still failing)
+ inRetryAfterChange := svc.RetryManager.IsInRetry(address)
+ require.True(t, inRetryAfterChange, "Node should be in retry queue
after config change")
+}
+
+func TestRetryQueueCleanupOnFileRemoval(t *testing.T) {
+ ctx := context.Background()
+
+ listener, grpcServer, mockServer := startMockGRPCServer(t)
+ defer grpcServer.Stop()
+ defer listener.Close()
+
+ mockServer.setNode(nil) // fail initially
+
+ address := listener.Addr().String()
+ configFile := createTempConfigFile(t, fmt.Sprintf(`
+nodes:
+ - name: cleanup-test-node
+ grpc_address: %s
+`, address))
+ defer os.Remove(configFile)
+
+ cfg := Config{
+ FilePath: configFile,
+ GRPCTimeout: testGRPCTimeout,
+ FetchInterval: testFetchInterval,
+ RetryInitialInterval: 100 * time.Millisecond,
+ RetryMaxInterval: 1 * time.Second,
+ RetryMultiplier: 2.0,
+ }
+
+ svc, err := NewService(cfg)
+ require.NoError(t, err)
+ defer svc.Close()
+
+ err = svc.Start(ctx)
+ require.NoError(t, err)
+
+ // wait for node to enter retry queue
+ time.Sleep(150 * time.Millisecond)
+
+ inRetry := svc.RetryManager.IsInRetry(address)
+ require.True(t, inRetry, "Node should be in retry queue")
+
+ // remove node from file
+ err = os.WriteFile(configFile, []byte(`nodes: []`), 0o600)
+ require.NoError(t, err)
+
+ // wait for file change detection and reload
+ time.Sleep(500 * time.Millisecond)
+
+ // verify node is removed from retry queue
+ stillInRetry := svc.RetryManager.IsInRetry(address)
+ require.False(t, stillInRetry, "Node should be removed from retry
queue")
+}
diff --git a/banyand/metadata/discovery/file/metrics.go
b/banyand/metadata/discovery/file/metrics.go
new file mode 100644
index 00000000..60d49ba4
--- /dev/null
+++ b/banyand/metadata/discovery/file/metrics.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package file
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+ fileLoadCount meter.Counter
+ fileLoadFailedCount meter.Counter
+ fileLoadDuration meter.Histogram
+ totalNodesCount meter.Gauge
+ nodeRetryCount meter.Counter
+ nodeRetryFailedCount meter.Counter
+ nodeRetrySuccessCount meter.Counter
+ nodeConnectedCount meter.Counter
+ retryQueueSize meter.Gauge
+}
+
+// newMetrics creates a new metrics instance.
+func newMetrics(factory observability.Factory) *metrics {
+ return &metrics{
+ fileLoadCount: factory.NewCounter("file_load_count"),
+ fileLoadFailedCount:
factory.NewCounter("file_load_failed_count"),
+ fileLoadDuration:
factory.NewHistogram("file_load_duration", meter.DefBuckets),
+ totalNodesCount: factory.NewGauge("total_nodes_count"),
+ nodeRetryCount: factory.NewCounter("node_retry_count"),
+ nodeRetryFailedCount:
factory.NewCounter("node_retry_failed_count"),
+ nodeRetrySuccessCount:
factory.NewCounter("node_retry_success_count"),
+ nodeConnectedCount:
factory.NewCounter("node_connected_count"),
+ retryQueueSize: factory.NewGauge("retry_queue_size"),
+ }
+}
+
+// IncRetryCount implements RetryMetrics interface.
+func (m *metrics) IncRetryCount() {
+ m.nodeRetryCount.Inc(1)
+}
+
+// IncRetrySuccess implements RetryMetrics interface.
+func (m *metrics) IncRetrySuccess() {
+ m.nodeRetrySuccessCount.Inc(1)
+}
+
+// IncRetryFailed implements RetryMetrics interface.
+func (m *metrics) IncRetryFailed() {
+ m.nodeRetryFailedCount.Inc(1)
+}
+
+// SetQueueSize implements RetryMetrics interface.
+func (m *metrics) SetQueueSize(size float64) {
+ m.retryQueueSize.Set(size)
+}
diff --git a/docs/operation/node-discovery.md b/docs/operation/node-discovery.md
index 990182f1..ee45887f 100644
--- a/docs/operation/node-discovery.md
+++ b/docs/operation/node-discovery.md
@@ -7,12 +7,13 @@ Node discovery enables BanyanDB nodes to locate and
communicate with each other
1. **Request Routing**: When liaison nodes need to send requests to data nodes
for query/write execution.
2. **Data Migration**: When the lifecycle service queries the node list to
perform shard migration and rebalancing.
-BanyanDB supports two discovery mechanisms to accommodate different deployment
environments:
+BanyanDB supports three discovery mechanisms to accommodate different
deployment environments:
- **Etcd-based Discovery**: Traditional distributed consensus approach
suitable for VM deployments and multi-cloud scenarios.
- **DNS-based Discovery**: Cloud-native solution leveraging Kubernetes service
discovery infrastructure.
+- **File-based Discovery**: Static configuration file approach for simple
deployments and testing environments.
-This document provides guidance on configuring and operating both discovery
modes.
+This document provides guidance on configuring and operating all discovery
modes.
## Etcd-Based Discovery
@@ -160,9 +161,13 @@ DNS-based discovery provides a cloud-native alternative
leveraging Kubernetes' b
# TLS configuration
--node-discovery-dns-tls=true # Enable TLS for DNS discovery
(default: false)
---node-discovery-dns-ca-certs=/path/to/ca.crt # CA certificates
(comma-separated)
+--node-discovery-dns-ca-certs=/path/to/ca.crt,/path/to/another.crt # Ordered
CA bundle matching SRV addresses
```
+`node-discovery-dns-fetch-init-interval` and
`node-discovery-dns-fetch-init-duration` define the aggressive polling strategy
during bootstrap before falling back to the steady-state
+`node-discovery-dns-fetch-interval`. All metadata fetches share the same
`node-discovery-grpc-timeout`, which is also reused by the file discovery mode.
When TLS is enabled, the CA cert
+list must match the SRV address order, ensuring each role (e.g., data,
liaison) can be verified with its own trust bundle.
+
### Configuration Examples
**Single Zone Discovery:**
@@ -178,7 +183,7 @@ banyand data \
**Multi-Zone Discovery:**
```shell
-banyand query \
+banyand data \
--node-discovery-mode=dns \
--node-discovery-dns-srv-addresses=_grpc._tcp.data.zone1.svc.local,_grpc._tcp.data.zone2.svc.local
\
--node-discovery-dns-fetch-interval=10s
@@ -278,6 +283,110 @@ spec:
This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local`
+## File-Based Discovery
+
+### Overview
+
+File-based discovery provides a simple static configuration approach where
nodes are defined in a YAML file. This mode is ideal for testing environments,
small deployments, or scenarios where dynamic service discovery infrastructure
is unavailable.
+
+The service periodically reloads the configuration file and automatically
updates the node registry when changes are detected.
+
+### How it Works
+1. Read node configurations from a YAML file on startup
+2. Attempt to connect to each node via gRPC to fetch full metadata
+3. Successfully connected nodes are added to the cache
+4. Nodes that fail to connect are skipped and will be attempted again on the
next periodic file reload
+5. Reload the file at the `node-discovery-file-fetch-interval` cadence as a
backup to fsnotify-based reloads, reprocessing every entry (including nodes
that previously failed)
+6. Notify registered handlers when nodes are added or removed
+
+### Configuration Flags
+
+```shell
+# Mode selection
+--node-discovery-mode=file # Enable file mode
+
+# File path (required for file mode)
+--node-discovery-file-path=/path/to/nodes.yaml
+
+# gRPC settings
+--node-discovery-grpc-timeout=5s # Timeout for metadata fetch
(default: 5s)
+
+# Interval settings
+--node-discovery-file-fetch-interval=5m # Polling interval to
reprocess the discovery file (default: 5m)
+--node-discovery-file-retry-initial-interval=1s # Initial retry delay
for failed node metadata fetches (default: 1s)
+--node-discovery-file-retry-max-interval=2m # Upper bound for retry
delay backoff (default: 2m)
+--node-discovery-file-retry-multiplier=2.0 # Multiplicative factor
applied between retries (default: 2.0)
+```
+
+`node-discovery-file-fetch-interval` controls the periodic full reload that
acts as a safety net even if filesystem events are missed.
+`node-discovery-file-retry-*` flags configure the exponential backoff used
when a node cannot be reached over gRPC. Failed nodes are retried starting from
the initial interval,
+multiplied by the configured factor until the max interval is reached.
+
+### YAML Configuration Format
+
+```yaml
+nodes:
+ - name: liaison-0
+ grpc_address: 192.168.1.10:18912
+ - name: data-hot-0
+ grpc_address: 192.168.1.20:17912
+ tls_enabled: true
+ ca_cert_path: /etc/banyandb/certs/ca.crt
+ - name: data-cold-0
+ grpc_address: 192.168.1.30:17912
+```
+
+**Configuration Fields:**
+
+- **name** (required): Identifier for the node
+- **grpc_address** (required): gRPC endpoint in `host:port` format
+- **tls_enabled** (optional): Enable TLS for gRPC connection (default: false)
+- **ca_cert_path** (optional): Path to CA certificate file (required when TLS
is enabled)
+
+### Configuration Examples
+
+**Basic Configuration:**
+
+```shell
+banyand data \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml
+```
+
+**With Custom Polling and Retry Settings:**
+
+```shell
+banyand data \
+ --node-discovery-mode=file \
+ --node-discovery-file-path=/etc/banyandb/nodes.yaml \
+ --node-discovery-file-fetch-interval=20m \
+ --node-discovery-file-retry-initial-interval=5s \
+ --node-discovery-file-retry-max-interval=1m \
+ --node-discovery-file-retry-multiplier=1.5
+```
+
+### Node Lifecycle
+
+#### Initial/Interval Load
+
+When the service starts:
+1. Reads the YAML configuration file
+2. Validates required fields (`grpc_address`)
+3. Attempts gRPC connection to each node
+4. Successfully connected nodes → added to cache
+
+### Error Handling
+
+**Startup Errors:**
+- Missing or invalid file path → service fails to start
+- Invalid YAML format → service fails to start
+- Missing required fields → service fails to start
+
+**Runtime Errors:**
+- gRPC connection failure → node skipped; retried on next file reload
+- File read error → keep existing cache, log error
+- File deleted → keep existing cache, log error
+
## Choosing a Discovery Mode
### Etcd Mode - Best For
@@ -293,4 +402,13 @@ This creates DNS SRV record:
`_grpc._tcp.banyandb-data.default.svc.cluster.local
- Simplified operations (no external etcd management)
- Cloud-native architecture alignment
- StatefulSets with stable network identities
-- Rapid deployment without external dependencies
\ No newline at end of file
+- Rapid deployment without external dependencies
+
+### File Mode - Best For
+
+- Development and testing environments
+- Small static clusters (< 10 nodes)
+- Air-gapped deployments without service discovery infrastructure
+- Proof-of-concept and demo setups
+- Environments where node membership is manually managed
+- Scenarios requiring predictable and auditable node configuration
\ No newline at end of file