Copilot commented on code in PR #882:
URL: 
https://github.com/apache/skywalking-banyandb/pull/882#discussion_r2589562039


##########
banyand/queue/pub/pub.go:
##########
@@ -126,6 +134,32 @@ func (p *pub) GracefulStop() {
 
 // Serve implements run.Service.
 func (p *pub) Serve() run.StopNotify {
+       // Start CA certificate reloader if enabled
+       if p.caCertReloader != nil {
+               if err := p.caCertReloader.Start(); err != nil {
+                       p.log.Error().Err(err).Msg("Failed to start CA 
certificate reloader")
+                       stopCh := p.closer.CloseNotify()
+                       return stopCh
+               }
+               p.log.Info().Str("caCertPath", p.caCertPath).Msg("Started CA 
certificate file monitoring")
+
+               // Listen for certificate update events
+               certUpdateCh := p.caCertReloader.GetUpdateChannel()
+               stopCh := p.closer.CloseNotify()
+               go func() {
+                       for {
+                               select {
+                               case <-certUpdateCh:
+                                       p.log.Info().Msg("CA certificate 
updated, reconnecting clients")
+                                       p.reconnectAllClients()
+                               case <-stopCh:
+                                       return
+                               }
+                       }
+               }()

Review Comment:
   The goroutine launched in `Serve()` to listen for certificate updates is not 
properly tracked by the closer. This could lead to a goroutine leak when the 
service is stopped.
   
   Consider using `p.closer.AddRunning()` before launching the goroutine and 
`defer p.closer.Done()` inside it, similar to how other goroutines are managed 
in the codebase (e.g., in `reconnectAllClients` and `startEvictRetry`).



##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
 }
 
 func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+       if !p.tlsEnabled {
+               return grpchelper.SecureOptions(nil, false, false, "")
+       }
+
+       // Use reloader if available (for dynamic reloading)
+       if p.caCertReloader != nil {
+               // Extract server name from the connection (we'll use a default 
for now)
+               // The actual server name will be validated by the TLS handshake
+               tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // Fallback to static file reading if reloader is not available
        opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false, 
p.caCertPath)
        if err != nil {
                return nil, fmt.Errorf("failed to load TLS config: %w", err)
        }
        return opts, nil
 }
 
+// reconnectAllClients reconnects all active clients when CA certificate is 
updated.
+func (p *pub) reconnectAllClients() {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Collect all active nodes
+       nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+       for name, client := range p.active {
+               if node := p.registered[name]; node != nil {
+                       nodesToReconnect = append(nodesToReconnect, node)
+                       // Close existing connection
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       p.deleteClient(client.md)
+               }
+       }
+
+       // Reconnect all nodes asynchronously
+       for _, node := range nodesToReconnect {
+               node := node // capture loop variable
+               if !p.closer.AddRunning() {
+                       continue
+               }
+               go func() {
+                       defer p.closer.Done()
+                       // Wait a bit to ensure the old connection is fully 
closed
+                       time.Sleep(200 * time.Millisecond)
+
+                       credOpts, err := p.getClientTransportCredentials()
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to load client TLS credentials during 
reconnect")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       conn, err := grpc.NewClient(node.GrpcAddress, 
append(credOpts,
+                               grpc.WithDefaultServiceConfig(p.retryPolicy),
+                               
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       // Check health before adding back to active
+                       if !p.healthCheck(node.String(), conn) {
+                               _ = conn.Close()
+                               p.log.Warn().Str("node", 
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       // Add back to active
+                       p.mu.Lock()
+                       name := node.Metadata.GetName()
+                       if _, exists := p.registered[name]; exists {
+                               c := clusterv1.NewServiceClient(conn)
+                               md := schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                       },
+                                       Spec: node,
+                               }
+                               p.active[name] = &client{conn: conn, client: c, 
md: md}
+                               p.addClient(md)
+                               p.recordSuccess(name)
+                               p.log.Info().Str("node", 
name).Msg("successfully reconnected client after CA certificate update")
+                       }
+                       p.mu.Unlock()
+               }()
+       }
+}
+
+// startEvictRetry starts a retry goroutine for a node in evictable queue.
+// This is used when reconnection fails after CA certificate update.
+func (p *pub) startEvictRetry(name string, node *databasev1.Node, md 
schema.Metadata) {
+       if !p.closer.AddRunning() {
+               return
+       }
+       go func() {
+               defer p.closer.Done()
+               attempt := 0
+               // Get evictNode from evictable map (it should already be added 
before calling this function)
+               p.mu.RLock()
+               en, exists := p.evictable[name]
+               p.mu.RUnlock()
+               if !exists {
+                       p.log.Warn().Str("node", name).Msg("node not found in 
evictable queue, skipping retry")
+                       return
+               }
+               for {
+                       backoff := jitteredBackoff(initBackoff, maxBackoff, 
attempt, defaultJitterFactor)
+                       select {
+                       case <-time.After(backoff):
+                               credOpts, errEvict := 
p.getClientTransportCredentials()
+                               if errEvict != nil {
+                                       p.log.Error().Err(errEvict).Str("node", 
name).Msg("failed to load client TLS credentials (evict retry)")
+                                       return
+                               }
+                               connEvict, errEvict := 
grpc.NewClient(node.GrpcAddress, append(credOpts,
+                                       
grpc.WithDefaultServiceConfig(p.retryPolicy),
+                                       
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxReceiveMessageSize)))...)
+                               if errEvict == nil && 
p.healthCheck(node.String(), connEvict) {
+                                       func() {
+                                               p.mu.Lock()
+                                               defer p.mu.Unlock()
+                                               if _, ok := p.evictable[name]; 
!ok {
+                                                       // The client has been 
removed from evict clients map, just return
+                                                       return
+                                               }
+                                               c := 
clusterv1.NewServiceClient(connEvict)
+                                               p.active[name] = &client{conn: 
connEvict, client: c, md: md}
+                                               p.addClient(md)
+                                               delete(p.evictable, name)
+                                               // Reset circuit breaker state 
to closed
+                                               p.recordSuccess(name)
+                                               p.log.Info().Str("node", 
name).Msg("successfully reconnected client after CA certificate update (retry)")
+                                       }()
+                                       return
+                               }
+                               _ = connEvict.Close()
+                               if _, ok := p.registered[name]; !ok {
+                                       return
+                               }
+                               p.log.Warn().Err(errEvict).Str("node", 
name).Dur("backoff", backoff).Msg("failed to re-connect to grpc server after CA 
cert update, will retry")
+                       case <-en.c:

Review Comment:
   There's a potential race condition between the `startEvictRetry` goroutine 
reading from `en.c` and the `GracefulStop()` or `registerNode()` methods 
closing it.
   
   The `startEvictRetry` reads `en.c` from the map at line 642-643 outside the 
lock, but the channel could be closed by `GracefulStop` (line 124) or 
`registerNode` (line 207) while the goroutine is still running. This could lead 
to undefined behavior.
   
   Consider checking if the node still exists in the evictable map before each 
select operation, or using a read lock to safely read from `en.c`.



##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
 }
 
 func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+       if !p.tlsEnabled {
+               return grpchelper.SecureOptions(nil, false, false, "")
+       }
+
+       // Use reloader if available (for dynamic reloading)
+       if p.caCertReloader != nil {
+               // Extract server name from the connection (we'll use a default 
for now)
+               // The actual server name will be validated by the TLS handshake
+               tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // Fallback to static file reading if reloader is not available
        opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false, 
p.caCertPath)
        if err != nil {
                return nil, fmt.Errorf("failed to load TLS config: %w", err)
        }
        return opts, nil
 }
 
+// reconnectAllClients reconnects all active clients when CA certificate is 
updated.
+func (p *pub) reconnectAllClients() {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Collect all active nodes
+       nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+       for name, client := range p.active {
+               if node := p.registered[name]; node != nil {
+                       nodesToReconnect = append(nodesToReconnect, node)
+                       // Close existing connection
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       p.deleteClient(client.md)
+               }
+       }
+
+       // Reconnect all nodes asynchronously
+       for _, node := range nodesToReconnect {
+               node := node // capture loop variable
+               if !p.closer.AddRunning() {
+                       continue
+               }
+               go func() {
+                       defer p.closer.Done()
+                       // Wait a bit to ensure the old connection is fully 
closed
+                       time.Sleep(200 * time.Millisecond)
+
+                       credOpts, err := p.getClientTransportCredentials()
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to load client TLS credentials during 
reconnect")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       conn, err := grpc.NewClient(node.GrpcAddress, 
append(credOpts,
+                               grpc.WithDefaultServiceConfig(p.retryPolicy),
+                               
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       // Check health before adding back to active
+                       if !p.healthCheck(node.String(), conn) {
+                               _ = conn.Close()
+                               p.log.Warn().Str("node", 
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       // Add back to active
+                       p.mu.Lock()
+                       name := node.Metadata.GetName()
+                       if _, exists := p.registered[name]; exists {
+                               c := clusterv1.NewServiceClient(conn)
+                               md := schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind: schema.KindNode,
+                                       },
+                                       Spec: node,
+                               }
+                               p.active[name] = &client{conn: conn, client: c, 
md: md}
+                               p.addClient(md)
+                               p.recordSuccess(name)
+                               p.log.Info().Str("node", 
name).Msg("successfully reconnected client after CA certificate update")
+                       }
+                       p.mu.Unlock()
+               }()
+       }
+}
+
+// startEvictRetry starts a retry goroutine for a node in evictable queue.
+// This is used when reconnection fails after CA certificate update.
+func (p *pub) startEvictRetry(name string, node *databasev1.Node, md 
schema.Metadata) {
+       if !p.closer.AddRunning() {
+               return
+       }
+       go func() {
+               defer p.closer.Done()
+               attempt := 0
+               // Get evictNode from evictable map (it should already be added 
before calling this function)
+               p.mu.RLock()
+               en, exists := p.evictable[name]
+               p.mu.RUnlock()
+               if !exists {
+                       p.log.Warn().Str("node", name).Msg("node not found in 
evictable queue, skipping retry")
+                       return
+               }
+               for {
+                       backoff := jitteredBackoff(initBackoff, maxBackoff, 
attempt, defaultJitterFactor)
+                       select {
+                       case <-time.After(backoff):
+                               credOpts, errEvict := 
p.getClientTransportCredentials()
+                               if errEvict != nil {
+                                       p.log.Error().Err(errEvict).Str("node", 
name).Msg("failed to load client TLS credentials (evict retry)")
+                                       return
+                               }
+                               connEvict, errEvict := 
grpc.NewClient(node.GrpcAddress, append(credOpts,
+                                       
grpc.WithDefaultServiceConfig(p.retryPolicy),
+                                       
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxReceiveMessageSize)))...)
+                               if errEvict == nil && 
p.healthCheck(node.String(), connEvict) {
+                                       func() {
+                                               p.mu.Lock()
+                                               defer p.mu.Unlock()
+                                               if _, ok := p.evictable[name]; 
!ok {
+                                                       // The client has been 
removed from evict clients map, just return
+                                                       return
+                                               }
+                                               c := 
clusterv1.NewServiceClient(connEvict)
+                                               p.active[name] = &client{conn: 
connEvict, client: c, md: md}
+                                               p.addClient(md)
+                                               delete(p.evictable, name)
+                                               // Reset circuit breaker state 
to closed
+                                               p.recordSuccess(name)
+                                               p.log.Info().Str("node", 
name).Msg("successfully reconnected client after CA certificate update (retry)")
+                                       }()
+                                       return
+                               }
+                               _ = connEvict.Close()

Review Comment:
   When a connection fails to be created (line 678), `connEvict.Close()` is 
called unconditionally. However, if `grpc.NewClient` failed (line 657), 
`connEvict` might be nil, which would cause a nil pointer dereference.
   
   The code should check if `connEvict != nil` before attempting to close it:
   ```go
   if connEvict != nil {
       _ = connEvict.Close()
   }
   ```
   
   This pattern is already correctly used in `reconnectAllClients` at line 589.
   ```suggestion
                                if connEvict != nil {
                                        _ = connEvict.Close()
                                }
   ```



##########
docs/operation/security.md:
##########
@@ -123,19 +123,33 @@ BanyanDB supports enabling TLS for the internal gRPC 
queue between liaison and d
 
 The following flags are used to configure internal TLS:
 
-- `--internal-tls`: Enable TLS on the internal queue client inside Liaison; if 
false, the queue uses plain TCP.
-- `--internal-ca-cert <path>`: PEM‑encoded CA (or bundle) that the queue 
client uses to verify Data‑Node server certificates.
+- `--data-client-tls`: Enable TLS on the internal queue client inside Liaison; 
if false, the queue uses plain TCP.
+- `--data-client-ca-cert`: PEM‑encoded CA (or bundle) that the queue client 
uses to verify Data‑Node server certificates.
 
 Each Liaison/Data process still advertises its certificate with the public 
flags (`--tls`, `--cert-file`, `--key-file`). The same certificate/key pair can 
be reused for both external traffic and the internal queue.
 
 **Example: Enable internal TLS between liaison and data nodes**
 
 ```shell
-banyand liaison --internal-tls=true --internal-ca-cert=ca.crt --tls=true 
--cert-file=server.crt --key-file=server.key
+banyand liaison --data-client-tls=true --data-client-ca-cert=ca.crt --tls=true 
--cert-file=server.crt --key-file=server.key
 banyand data --tls=true --cert-file=server.crt --key-file=server.key
 ```
 
-> Note: The `--internal-ca-cert` should point to the CA certificate used to 
sign the data node's server certificate.
+> Note: 
+> - The `--data-client-ca-cert` should point to the CA certificate used to 
sign the data node's server certificate.
+> - Data nodes act as servers and do not need a CA certificate to connect to 
liaison nodes (liaison nodes connect to data nodes, not vice versa).

Review Comment:
   [nitpick] The note states "Data nodes act as servers and do not need a CA 
certificate to connect to liaison nodes (liaison nodes connect to data nodes, 
not vice versa)."
   
   This statement could be clearer. Data nodes don't need a CA certificate 
because they act as **servers only** in this internal TLS setup - they don't 
make client connections to liaison nodes. Consider rephrasing for clarity:
   
   "Data nodes act as servers only and do not need a CA certificate because 
liaison nodes connect to data nodes (not vice versa)."
   ```suggestion
   > - Data nodes act as servers only and do not need a CA certificate because 
liaison nodes connect to data nodes (not vice versa).
   ```



##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
 }
 
 func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+       if !p.tlsEnabled {
+               return grpchelper.SecureOptions(nil, false, false, "")
+       }
+
+       // Use reloader if available (for dynamic reloading)
+       if p.caCertReloader != nil {
+               // Extract server name from the connection (we'll use a default 
for now)
+               // The actual server name will be validated by the TLS handshake
+               tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // Fallback to static file reading if reloader is not available
        opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false, 
p.caCertPath)
        if err != nil {
                return nil, fmt.Errorf("failed to load TLS config: %w", err)
        }
        return opts, nil
 }
 
+// reconnectAllClients reconnects all active clients when CA certificate is 
updated.
+func (p *pub) reconnectAllClients() {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Collect all active nodes
+       nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+       for name, client := range p.active {
+               if node := p.registered[name]; node != nil {
+                       nodesToReconnect = append(nodesToReconnect, node)
+                       // Close existing connection
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       p.deleteClient(client.md)
+               }
+       }
+
+       // Reconnect all nodes asynchronously
+       for _, node := range nodesToReconnect {
+               node := node // capture loop variable
+               if !p.closer.AddRunning() {
+                       continue
+               }
+               go func() {
+                       defer p.closer.Done()
+                       // Wait a bit to ensure the old connection is fully 
closed
+                       time.Sleep(200 * time.Millisecond)
+
+                       credOpts, err := p.getClientTransportCredentials()
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to load client TLS credentials during 
reconnect")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       conn, err := grpc.NewClient(node.GrpcAddress, 
append(credOpts,
+                               grpc.WithDefaultServiceConfig(p.retryPolicy),
+                               
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)

Review Comment:
   The hardcoded value `32<<20` for MaxCallRecvMsgSize should be replaced with 
the constant `maxReceiveMessageSize` which is already defined in `client.go` 
and used consistently throughout the codebase.
   
   This ensures consistency across all gRPC client configurations and makes it 
easier to maintain if the value needs to be changed in the future.



##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
 }
 
 func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+       if !p.tlsEnabled {
+               return grpchelper.SecureOptions(nil, false, false, "")
+       }
+
+       // Use reloader if available (for dynamic reloading)
+       if p.caCertReloader != nil {
+               // Extract server name from the connection (we'll use a default 
for now)
+               // The actual server name will be validated by the TLS handshake
+               tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // Fallback to static file reading if reloader is not available
        opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false, 
p.caCertPath)
        if err != nil {
                return nil, fmt.Errorf("failed to load TLS config: %w", err)
        }
        return opts, nil
 }
 
+// reconnectAllClients reconnects all active clients when CA certificate is 
updated.
+func (p *pub) reconnectAllClients() {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Collect all active nodes
+       nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+       for name, client := range p.active {
+               if node := p.registered[name]; node != nil {
+                       nodesToReconnect = append(nodesToReconnect, node)
+                       // Close existing connection
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       p.deleteClient(client.md)
+               }
+       }
+
+       // Reconnect all nodes asynchronously
+       for _, node := range nodesToReconnect {
+               node := node // capture loop variable
+               if !p.closer.AddRunning() {
+                       continue
+               }
+               go func() {
+                       defer p.closer.Done()
+                       // Wait a bit to ensure the old connection is fully 
closed
+                       time.Sleep(200 * time.Millisecond)
+
+                       credOpts, err := p.getClientTransportCredentials()
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to load client TLS credentials during 
reconnect")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       conn, err := grpc.NewClient(node.GrpcAddress, 
append(credOpts,
+                               grpc.WithDefaultServiceConfig(p.retryPolicy),
+                               
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+                       if err != nil {
+                               p.log.Error().Err(err).Str("node", 
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return
+                       }
+
+                       // Check health before adding back to active
+                       if !p.healthCheck(node.String(), conn) {
+                               _ = conn.Close()
+                               p.log.Warn().Str("node", 
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+                               // Move to evictable queue for retry
+                               p.mu.Lock()
+                               name := node.Metadata.GetName()
+                               if _, exists := p.registered[name]; exists {
+                                       md := schema.Metadata{
+                                               TypeMeta: schema.TypeMeta{
+                                                       Kind: schema.KindNode,
+                                               },
+                                               Spec: node,
+                                       }
+                                       p.evictable[name] = evictNode{n: node, 
c: make(chan struct{})}
+                                       p.mu.Unlock()
+                                       p.startEvictRetry(name, node, md)
+                               } else {
+                                       p.mu.Unlock()
+                               }
+                               return

Review Comment:
   The logic for moving a node to the evictable queue and starting retry is 
duplicated three times in this function (lines 545-560, 569-584, and 592-607). 
   
   Consider extracting this into a helper method like 
`moveToEvictableAndRetry(name, node)` to reduce code duplication and improve 
maintainability. This would make the error handling paths clearer and ensure 
consistent behavior across all failure scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to