Copilot commented on code in PR #920:
URL: 
https://github.com/apache/skywalking-banyandb/pull/920#discussion_r2656039263


##########
banyand/queue/sub/server.go:
##########
@@ -139,6 +144,27 @@ func (s *server) PreRun(_ context.Context) error {
                s.log.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).Msg("Initialized TLS reloader for queue server")
        }
 
+       nodeVal := ctx.Value(common.ContextNodeKey)
+       roleVal := ctx.Value(common.ContextNodeRolesKey)
+       if nodeVal == nil || roleVal == nil {
+               s.log.Warn().Msg("node or role value not found in context")
+               return nil
+       }
+       nodeRoles := roleVal.([]databasev1.Role)
+       node := nodeVal.(common.Node)

Review Comment:
   Type assertion without checking may panic if the context value is not the 
expected type. Add type assertion checks or document that these values are 
guaranteed to be the correct types.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, addr)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("address", addr).
+                                       Msg("Failed to fetch node metadata")
+                               addErrors = append(addErrors, fetchErr)
+                               continue
+                       }
+
+                       // update cache and notify handlers
+                       s.cacheMutex.Lock()
+                       s.nodeCache[addr] = node
+                       s.cacheMutex.Unlock()
+
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, true)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("New node discovered and added to cache")
+               }
+       }
+
+       s.cacheMutex.Lock()
+       for addr, node := range s.nodeCache {
+               if !addressSet[addr] {
+                       // update cache and notify the handlers
+                       delete(s.nodeCache, addr)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("Node removed from cache (no longer in 
DNS)")
+
+                       s.cacheMutex.Unlock()
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, false)
+                       s.cacheMutex.Lock()
+               }
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // update total nodes metric
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+
+       if len(addErrors) > 0 {
+               return errors.Join(addErrors...)
+       }
+
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, address string) 
(*databasev1.Node, error) {
+       // Record gRPC query metrics
+       startTime := time.Now()
+       var grpcErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.grpcQueryCount.Inc(1)
+                       s.metrics.grpcQueryDuration.Observe(duration.Seconds())
+                       s.metrics.grpcQueryTotalDuration.Inc(duration.Seconds())
+                       if grpcErr != nil {
+                               s.metrics.grpcQueryFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // for TLS connections with other nodes to getting matadata
+       dialOpts, err := s.getTLSDialOptions()
+       if err != nil {
+               grpcErr = fmt.Errorf("failed to get TLS dial options: %w", err)
+               return nil, grpcErr
+       }
+       // nolint:contextcheck
+       conn, connErr := grpchelper.ConnWithAuth(address, s.grpcTimeout, "", 
"", dialOpts...)

Review Comment:
   Using grpchelper.ConnWithAuth with ctxTimeout parameter, but passing 
s.grpcTimeout instead. The connection should respect the context deadline from 
ctxTimeout, not pass a separate timeout duration.
   ```suggestion
        conn, connErr := grpchelper.ConnWithAuth(address, ctxTimeout, "", "", 
dialOpts...)
   ```



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, addr)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("address", addr).
+                                       Msg("Failed to fetch node metadata")
+                               addErrors = append(addErrors, fetchErr)
+                               continue
+                       }
+
+                       // update cache and notify handlers
+                       s.cacheMutex.Lock()
+                       s.nodeCache[addr] = node
+                       s.cacheMutex.Unlock()
+
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, true)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("New node discovered and added to cache")
+               }
+       }
+
+       s.cacheMutex.Lock()
+       for addr, node := range s.nodeCache {
+               if !addressSet[addr] {
+                       // update cache and notify the handlers
+                       delete(s.nodeCache, addr)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("Node removed from cache (no longer in 
DNS)")
+
+                       s.cacheMutex.Unlock()
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, false)
+                       s.cacheMutex.Lock()
+               }
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()
+
+       // update total nodes metric
+       if s.metrics != nil {
+               s.metrics.totalNodesCount.Set(float64(cacheSize))
+       }
+
+       if len(addErrors) > 0 {
+               return errors.Join(addErrors...)
+       }
+
+       return nil
+}
+
+func (s *Service) fetchNodeMetadata(ctx context.Context, address string) 
(*databasev1.Node, error) {
+       // Record gRPC query metrics
+       startTime := time.Now()
+       var grpcErr error
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.grpcQueryCount.Inc(1)
+                       s.metrics.grpcQueryDuration.Observe(duration.Seconds())
+                       s.metrics.grpcQueryTotalDuration.Inc(duration.Seconds())
+                       if grpcErr != nil {
+                               s.metrics.grpcQueryFailedCount.Inc(1)
+                       }
+               }
+       }()
+
+       ctxTimeout, cancel := context.WithTimeout(ctx, s.grpcTimeout)
+       defer cancel()
+
+       // for TLS connections with other nodes to getting matadata

Review Comment:
   Typo in comment: "matadata" should be "metadata"
   ```suggestion
        // for TLS connections with other nodes to getting metadata
   ```



##########
banyand/metadata/client.go:
##########
@@ -96,13 +108,35 @@ func (s *clientService) FlagSet() *run.FlagSet {
        fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key 
for the etcd client certificate.")
        fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
        fs.DurationVar(&s.etcdFullSyncInterval, "etcd-full-sync-interval", 
30*time.Minute, "The interval for full sync etcd")
+
+       // DNS-based node discovery configuration
+       fs.BoolVar(&s.dnsRegistryEnabled, "node-registry-dns-enabled", false,
+               "Is DNS registry enabled")
+       fs.StringSliceVar(&s.dnsSRVAddresses, 
"node-registry-dns-srv-addresses", []string{},
+               "DNS SRV addresses for node discovery (e.g., 
_grpc._tcp.banyandb.svc.cluster.local)")
+       fs.DurationVar(&s.dnsFetchInitInterval, 
"node-registry-dns-fetch-init-interval", 1*time.Second,
+               "DNS query interval during initialization phase")
+       fs.DurationVar(&s.dnsFetchInitDuration, 
"node-registry-dns-fetch-init-duration", 30*time.Second,
+               "Duration of the initialization phase for DNS discovery")
+       fs.DurationVar(&s.dnsFetchInterval, "node-registry-dns-fetch-interval", 
10*time.Second,
+               "DNS query interval after initialization phase")
+       fs.DurationVar(&s.grpcTimeout, "node-registry-grpc-timeout", 
5*time.Second,
+               "Timeout for gRPC calls to fetch node metadata")
+       fs.BoolVar(&s.dnsTLSEnabled, "node-registry-dns-tls", false,
+               "Enable TLS for DNS discovery gRPC connections")
+       fs.StringVar(&s.dnsCACertPath, "node-registry-dns-ca-cert", "",
+               "CA certificate file to verify DNS discovered nodes")
+
        return fs
 }
 
 func (s *clientService) Validate() error {
        if s.endpoints == nil {
                return errors.New("endpoints is empty")
        }
+       if s.dnsRegistryEnabled && s.dnsTLSEnabled && s.dnsCACertPath == "" {
+               return fmt.Errorf("DNS TLS is enabled, but no CA certificate 
file was provided")
+       }
        return nil

Review Comment:
   DNS discovery is initialized but validation only checks dnsCACertPath when 
TLS is enabled. Should also validate that dnsSRVAddresses is not empty when 
dnsRegistryEnabled is true, to fail fast rather than during runtime.



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC

Review Comment:
   Comment says "fetch metrics from gRPC" but the code actually fetches node 
metadata, not metrics. The comment should be corrected.
   ```suggestion
                        // fetch node metadata from gRPC
   ```



##########
banyand/metadata/dns/dns.go:
##########
@@ -0,0 +1,508 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dns implements DNS-based node discovery for distributed metadata 
management.
+package dns
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "net"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+// Service implements DNS-based node discovery.
+type Service struct {
+       lastQueryTime     time.Time
+       resolver          Resolver
+       caCertReloader    *pkgtls.Reloader
+       nodeCache         map[string]*databasev1.Node
+       closer            *run.Closer
+       log               *logger.Logger
+       metrics           *metrics
+       handlers          map[string]schema.EventHandler
+       caCertPath        string
+       srvAddresses      []string
+       lastSuccessfulDNS []string
+       pollInterval      time.Duration
+       initInterval      time.Duration
+       initDuration      time.Duration
+       grpcTimeout       time.Duration
+       cacheMutex        sync.RWMutex
+       lastSuccessMutex  sync.RWMutex
+       handlersMutex     sync.RWMutex
+       lastQueryMutex    sync.RWMutex
+       tlsEnabled        bool
+}
+
+// Config holds configuration for DNS discovery service.
+type Config struct {
+       CACertPath   string
+       SRVAddresses []string
+       InitInterval time.Duration
+       InitDuration time.Duration
+       PollInterval time.Duration
+       GRPCTimeout  time.Duration
+       TLSEnabled   bool
+}
+
+// Resolver defines the interface for DNS SRV lookups.
+type Resolver interface {
+       LookupSRV(ctx context.Context, name string) (string, []*net.SRV, error)
+}
+
+// defaultResolver wraps net.DefaultResolver to implement Resolver.
+type defaultResolver struct{}
+
+func (d *defaultResolver) LookupSRV(ctx context.Context, name string) (string, 
[]*net.SRV, error) {
+       return net.DefaultResolver.LookupSRV(ctx, "", "", name)
+}
+
+// NewService creates a new DNS discovery service.
+func NewService(cfg Config) (*Service, error) {
+       // Validation
+       if len(cfg.SRVAddresses) == 0 {
+               return nil, errors.New("DNS SRV addresses cannot be empty")
+       }
+
+       svc := &Service{
+               srvAddresses:      cfg.SRVAddresses,
+               initInterval:      cfg.InitInterval,
+               initDuration:      cfg.InitDuration,
+               pollInterval:      cfg.PollInterval,
+               grpcTimeout:       cfg.GRPCTimeout,
+               tlsEnabled:        cfg.TLSEnabled,
+               caCertPath:        cfg.CACertPath,
+               nodeCache:         make(map[string]*databasev1.Node),
+               handlers:          make(map[string]schema.EventHandler),
+               lastSuccessfulDNS: []string{},
+               closer:            run.NewCloser(1),
+               log:               logger.GetLogger("metadata-discovery-dns"),
+               resolver:          &defaultResolver{},
+       }
+
+       if svc.tlsEnabled && svc.caCertPath != "" {
+               var err error
+               svc.caCertReloader, err = 
pkgtls.NewClientCertReloader(svc.caCertPath, svc.log)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", err)
+               }
+               svc.log.Debug().Str("caCertPath", 
svc.caCertPath).Msg("Initialized DNS CA certificate reloader")
+       }
+
+       return svc, nil
+}
+
+// newServiceWithResolver creates a service with a custom resolver (for 
testing).
+func newServiceWithResolver(cfg Config, resolver Resolver) (*Service, error) {
+       svc, err := NewService(cfg)
+       if err != nil {
+               return nil, err
+       }
+       svc.resolver = resolver
+       return svc, nil
+}
+
+func (s *Service) getTLSDialOptions() ([]grpc.DialOption, error) {
+       if !s.tlsEnabled {
+               return 
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
+       }
+
+       if s.caCertReloader != nil {
+               tlsConfig, err := s.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       opts, err := grpchelper.SecureOptions(nil, s.tlsEnabled, false, 
s.caCertPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to load TLS config: %w", err)
+       }
+       return opts, nil
+}
+
+// Start begins the DNS discovery background process.
+func (s *Service) Start(ctx context.Context) error {
+       s.log.Debug().Msg("Starting DNS-based node discovery service")
+
+       go s.discoveryLoop(ctx)
+
+       return nil
+}
+
+func (s *Service) discoveryLoop(ctx context.Context) {
+       // add the init phase finish time
+       initPhaseEnd := time.Now().Add(s.initDuration)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-s.closer.CloseNotify():
+                       return
+               default:
+               }
+
+               if err := s.queryDNSAndUpdateNodes(ctx); err != nil {
+                       s.log.Err(err).Msg("failed to query DNS and update 
nodes")
+               }
+
+               // wait for next interval
+               var interval time.Duration
+               if time.Now().Before(initPhaseEnd) {
+                       interval = s.initInterval
+               } else {
+                       interval = s.pollInterval
+               }
+               time.Sleep(interval)
+       }
+}
+
+func (s *Service) queryDNSAndUpdateNodes(ctx context.Context) error {
+       // Record summary metrics
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.discoveryCount.Inc(1)
+                       s.metrics.discoveryDuration.Observe(duration.Seconds())
+                       s.metrics.discoveryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       addresses, queryErr := s.queryAllSRVRecords(ctx)
+
+       if queryErr != nil {
+               s.log.Warn().Err(queryErr).Msg("DNS query failed, using last 
successful cache")
+
+               // Use last successful cache
+               s.lastSuccessMutex.RLock()
+               addresses = s.lastSuccessfulDNS
+               s.lastSuccessMutex.RUnlock()
+
+               if len(addresses) == 0 {
+                       if s.metrics != nil {
+                               s.metrics.discoveryFailedCount.Inc(1)
+                       }
+                       return fmt.Errorf("DNS query failed and no cached 
addresses available: %w", queryErr)
+               }
+       } else {
+               // Update last successful cache
+               s.lastSuccessMutex.Lock()
+               s.lastSuccessfulDNS = addresses
+               s.lastSuccessMutex.Unlock()
+
+               if s.log.Debug().Enabled() {
+                       s.log.Debug().
+                               Int("count", len(addresses)).
+                               Strs("addresses", addresses).
+                               Strs("srv_addresses", s.srvAddresses).
+                               Msg("DNS query successful")
+               }
+       }
+
+       // Update node cache based on DNS results
+       updateErr := s.updateNodeCache(ctx, addresses)
+       if updateErr != nil && s.metrics != nil {
+               s.metrics.discoveryFailedCount.Inc(1)
+       }
+       s.lastQueryMutex.Lock()
+       s.lastQueryTime = time.Now()
+       s.lastQueryMutex.Unlock()
+       return updateErr
+}
+
+func (s *Service) queryAllSRVRecords(ctx context.Context) ([]string, error) {
+       startTime := time.Now()
+       defer func() {
+               if s.metrics != nil {
+                       duration := time.Since(startTime)
+                       s.metrics.dnsQueryCount.Inc(1)
+                       s.metrics.dnsQueryDuration.Observe(duration.Seconds())
+                       s.metrics.dnsQueryTotalDuration.Inc(duration.Seconds())
+               }
+       }()
+
+       allAddresses := make(map[string]bool)
+       var queryErrors []error
+
+       for _, srvAddr := range s.srvAddresses {
+               _, addrs, lookupErr := s.resolver.LookupSRV(ctx, srvAddr)
+               if lookupErr != nil {
+                       queryErrors = append(queryErrors, fmt.Errorf("lookup %s 
failed: %w", srvAddr, lookupErr))
+                       continue
+               }
+
+               for _, srv := range addrs {
+                       address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+                       allAddresses[address] = true
+               }
+       }
+
+       // if there have any error occurred,
+       // then just return the query error to ignore the result to make sure 
the cache correct
+       if len(queryErrors) > 0 {
+               if s.metrics != nil {
+                       s.metrics.dnsQueryFailedCount.Inc(1)
+               }
+               return nil, errors.Join(queryErrors...)
+       }
+
+       // convert map to slice
+       result := make([]string, 0, len(allAddresses))
+       for addr := range allAddresses {
+               result = append(result, addr)
+       }
+
+       return result, nil
+}
+
+func (s *Service) updateNodeCache(ctx context.Context, addresses []string) 
error {
+       addressSet := make(map[string]bool)
+       for _, addr := range addresses {
+               addressSet[addr] = true
+       }
+
+       var addErrors []error
+
+       for addr := range addressSet {
+               s.cacheMutex.RLock()
+               _, exists := s.nodeCache[addr]
+               s.cacheMutex.RUnlock()
+
+               if !exists {
+                       // fetch metrics from gRPC
+                       node, fetchErr := s.fetchNodeMetadata(ctx, addr)
+                       if fetchErr != nil {
+                               s.log.Warn().
+                                       Err(fetchErr).
+                                       Str("address", addr).
+                                       Msg("Failed to fetch node metadata")
+                               addErrors = append(addErrors, fetchErr)
+                               continue
+                       }
+
+                       // update cache and notify handlers
+                       s.cacheMutex.Lock()
+                       s.nodeCache[addr] = node
+                       s.cacheMutex.Unlock()
+
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, true)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("New node discovered and added to cache")
+               }
+       }
+
+       s.cacheMutex.Lock()
+       for addr, node := range s.nodeCache {
+               if !addressSet[addr] {
+                       // update cache and notify the handlers
+                       delete(s.nodeCache, addr)
+
+                       s.log.Debug().
+                               Str("address", addr).
+                               Str("name", node.GetMetadata().GetName()).
+                               Msg("Node removed from cache (no longer in 
DNS)")
+
+                       s.cacheMutex.Unlock()
+                       s.notifyHandlers(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind: schema.KindNode,
+                                       Name: node.GetMetadata().GetName(),
+                               },
+                               Spec: node,
+                       }, false)
+                       s.cacheMutex.Lock()
+               }
+       }
+       cacheSize := len(s.nodeCache)
+       s.cacheMutex.Unlock()

Review Comment:
   The lock is released and re-acquired during handler notification, which 
could lead to inconsistent state if another goroutine modifies the cache 
between the unlock and lock. Consider collecting all nodes to delete first, 
then releasing the lock once before notifying handlers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to