Copilot commented on code in PR #882:
URL:
https://github.com/apache/skywalking-banyandb/pull/882#discussion_r2589562039
##########
banyand/queue/pub/pub.go:
##########
@@ -126,6 +134,32 @@ func (p *pub) GracefulStop() {
// Serve implements run.Service.
func (p *pub) Serve() run.StopNotify {
+ // Start CA certificate reloader if enabled
+ if p.caCertReloader != nil {
+ if err := p.caCertReloader.Start(); err != nil {
+ p.log.Error().Err(err).Msg("Failed to start CA
certificate reloader")
+ stopCh := p.closer.CloseNotify()
+ return stopCh
+ }
+ p.log.Info().Str("caCertPath", p.caCertPath).Msg("Started CA
certificate file monitoring")
+
+ // Listen for certificate update events
+ certUpdateCh := p.caCertReloader.GetUpdateChannel()
+ stopCh := p.closer.CloseNotify()
+ go func() {
+ for {
+ select {
+ case <-certUpdateCh:
+ p.log.Info().Msg("CA certificate
updated, reconnecting clients")
+ p.reconnectAllClients()
+ case <-stopCh:
+ return
+ }
+ }
+ }()
Review Comment:
The goroutine launched in `Serve()` to listen for certificate updates is not
properly tracked by the closer. This could lead to a goroutine leak when the
service is stopped.
Consider using `p.closer.AddRunning()` before launching the goroutine and
`defer p.closer.Done()` inside it, similar to how other goroutines are managed
in the codebase (e.g., in `reconnectAllClients` and `startEvictRetry`).
##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
}
func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+ if !p.tlsEnabled {
+ return grpchelper.SecureOptions(nil, false, false, "")
+ }
+
+ // Use reloader if available (for dynamic reloading)
+ if p.caCertReloader != nil {
+ // Extract server name from the connection (we'll use a default
for now)
+ // The actual server name will be validated by the TLS handshake
+ tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader: %w", err)
+ }
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // Fallback to static file reading if reloader is not available
opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false,
p.caCertPath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
}
return opts, nil
}
+// reconnectAllClients reconnects all active clients when CA certificate is
updated.
+func (p *pub) reconnectAllClients() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Collect all active nodes
+ nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+ for name, client := range p.active {
+ if node := p.registered[name]; node != nil {
+ nodesToReconnect = append(nodesToReconnect, node)
+ // Close existing connection
+ _ = client.conn.Close()
+ delete(p.active, name)
+ p.deleteClient(client.md)
+ }
+ }
+
+ // Reconnect all nodes asynchronously
+ for _, node := range nodesToReconnect {
+ node := node // capture loop variable
+ if !p.closer.AddRunning() {
+ continue
+ }
+ go func() {
+ defer p.closer.Done()
+ // Wait a bit to ensure the old connection is fully
closed
+ time.Sleep(200 * time.Millisecond)
+
+ credOpts, err := p.getClientTransportCredentials()
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to load client TLS credentials during
reconnect")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ conn, err := grpc.NewClient(node.GrpcAddress,
append(credOpts,
+ grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ // Check health before adding back to active
+ if !p.healthCheck(node.String(), conn) {
+ _ = conn.Close()
+ p.log.Warn().Str("node",
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ // Add back to active
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ c := clusterv1.NewServiceClient(conn)
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.active[name] = &client{conn: conn, client: c,
md: md}
+ p.addClient(md)
+ p.recordSuccess(name)
+ p.log.Info().Str("node",
name).Msg("successfully reconnected client after CA certificate update")
+ }
+ p.mu.Unlock()
+ }()
+ }
+}
+
+// startEvictRetry starts a retry goroutine for a node in evictable queue.
+// This is used when reconnection fails after CA certificate update.
+func (p *pub) startEvictRetry(name string, node *databasev1.Node, md
schema.Metadata) {
+ if !p.closer.AddRunning() {
+ return
+ }
+ go func() {
+ defer p.closer.Done()
+ attempt := 0
+ // Get evictNode from evictable map (it should already be added
before calling this function)
+ p.mu.RLock()
+ en, exists := p.evictable[name]
+ p.mu.RUnlock()
+ if !exists {
+ p.log.Warn().Str("node", name).Msg("node not found in
evictable queue, skipping retry")
+ return
+ }
+ for {
+ backoff := jitteredBackoff(initBackoff, maxBackoff,
attempt, defaultJitterFactor)
+ select {
+ case <-time.After(backoff):
+ credOpts, errEvict :=
p.getClientTransportCredentials()
+ if errEvict != nil {
+ p.log.Error().Err(errEvict).Str("node",
name).Msg("failed to load client TLS credentials (evict retry)")
+ return
+ }
+ connEvict, errEvict :=
grpc.NewClient(node.GrpcAddress, append(credOpts,
+
grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxReceiveMessageSize)))...)
+ if errEvict == nil &&
p.healthCheck(node.String(), connEvict) {
+ func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if _, ok := p.evictable[name];
!ok {
+ // The client has been
removed from evict clients map, just return
+ return
+ }
+ c :=
clusterv1.NewServiceClient(connEvict)
+ p.active[name] = &client{conn:
connEvict, client: c, md: md}
+ p.addClient(md)
+ delete(p.evictable, name)
+ // Reset circuit breaker state
to closed
+ p.recordSuccess(name)
+ p.log.Info().Str("node",
name).Msg("successfully reconnected client after CA certificate update (retry)")
+ }()
+ return
+ }
+ _ = connEvict.Close()
+ if _, ok := p.registered[name]; !ok {
+ return
+ }
+ p.log.Warn().Err(errEvict).Str("node",
name).Dur("backoff", backoff).Msg("failed to re-connect to grpc server after CA
cert update, will retry")
+ case <-en.c:
Review Comment:
There's a potential race condition between the `startEvictRetry` goroutine
reading from `en.c` and the `GracefulStop()` or `registerNode()` methods
closing it.
The `startEvictRetry` reads `en.c` from the map at line 642-643 outside the
lock, but the channel could be closed by `GracefulStop` (line 124) or
`registerNode` (line 207) while the goroutine is still running. This could lead
to undefined behavior.
Consider checking if the node still exists in the evictable map before each
select operation, or using a read lock to safely read from `en.c`.
##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
}
func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+ if !p.tlsEnabled {
+ return grpchelper.SecureOptions(nil, false, false, "")
+ }
+
+ // Use reloader if available (for dynamic reloading)
+ if p.caCertReloader != nil {
+ // Extract server name from the connection (we'll use a default
for now)
+ // The actual server name will be validated by the TLS handshake
+ tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader: %w", err)
+ }
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // Fallback to static file reading if reloader is not available
opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false,
p.caCertPath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
}
return opts, nil
}
+// reconnectAllClients reconnects all active clients when CA certificate is
updated.
+func (p *pub) reconnectAllClients() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Collect all active nodes
+ nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+ for name, client := range p.active {
+ if node := p.registered[name]; node != nil {
+ nodesToReconnect = append(nodesToReconnect, node)
+ // Close existing connection
+ _ = client.conn.Close()
+ delete(p.active, name)
+ p.deleteClient(client.md)
+ }
+ }
+
+ // Reconnect all nodes asynchronously
+ for _, node := range nodesToReconnect {
+ node := node // capture loop variable
+ if !p.closer.AddRunning() {
+ continue
+ }
+ go func() {
+ defer p.closer.Done()
+ // Wait a bit to ensure the old connection is fully
closed
+ time.Sleep(200 * time.Millisecond)
+
+ credOpts, err := p.getClientTransportCredentials()
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to load client TLS credentials during
reconnect")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ conn, err := grpc.NewClient(node.GrpcAddress,
append(credOpts,
+ grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ // Check health before adding back to active
+ if !p.healthCheck(node.String(), conn) {
+ _ = conn.Close()
+ p.log.Warn().Str("node",
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ // Add back to active
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ c := clusterv1.NewServiceClient(conn)
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.active[name] = &client{conn: conn, client: c,
md: md}
+ p.addClient(md)
+ p.recordSuccess(name)
+ p.log.Info().Str("node",
name).Msg("successfully reconnected client after CA certificate update")
+ }
+ p.mu.Unlock()
+ }()
+ }
+}
+
+// startEvictRetry starts a retry goroutine for a node in evictable queue.
+// This is used when reconnection fails after CA certificate update.
+func (p *pub) startEvictRetry(name string, node *databasev1.Node, md
schema.Metadata) {
+ if !p.closer.AddRunning() {
+ return
+ }
+ go func() {
+ defer p.closer.Done()
+ attempt := 0
+ // Get evictNode from evictable map (it should already be added
before calling this function)
+ p.mu.RLock()
+ en, exists := p.evictable[name]
+ p.mu.RUnlock()
+ if !exists {
+ p.log.Warn().Str("node", name).Msg("node not found in
evictable queue, skipping retry")
+ return
+ }
+ for {
+ backoff := jitteredBackoff(initBackoff, maxBackoff,
attempt, defaultJitterFactor)
+ select {
+ case <-time.After(backoff):
+ credOpts, errEvict :=
p.getClientTransportCredentials()
+ if errEvict != nil {
+ p.log.Error().Err(errEvict).Str("node",
name).Msg("failed to load client TLS credentials (evict retry)")
+ return
+ }
+ connEvict, errEvict :=
grpc.NewClient(node.GrpcAddress, append(credOpts,
+
grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxReceiveMessageSize)))...)
+ if errEvict == nil &&
p.healthCheck(node.String(), connEvict) {
+ func() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if _, ok := p.evictable[name];
!ok {
+ // The client has been
removed from evict clients map, just return
+ return
+ }
+ c :=
clusterv1.NewServiceClient(connEvict)
+ p.active[name] = &client{conn:
connEvict, client: c, md: md}
+ p.addClient(md)
+ delete(p.evictable, name)
+ // Reset circuit breaker state
to closed
+ p.recordSuccess(name)
+ p.log.Info().Str("node",
name).Msg("successfully reconnected client after CA certificate update (retry)")
+ }()
+ return
+ }
+ _ = connEvict.Close()
Review Comment:
When a connection fails to be created (line 678), `connEvict.Close()` is
called unconditionally. However, if `grpc.NewClient` failed (line 657),
`connEvict` might be nil, which would cause a nil pointer dereference.
The code should check if `connEvict != nil` before attempting to close it:
```go
if connEvict != nil {
_ = connEvict.Close()
}
```
This pattern is already correctly used in `reconnectAllClients` at line 589.
```suggestion
if connEvict != nil {
_ = connEvict.Close()
}
```
##########
docs/operation/security.md:
##########
@@ -123,19 +123,33 @@ BanyanDB supports enabling TLS for the internal gRPC
queue between liaison and d
The following flags are used to configure internal TLS:
-- `--internal-tls`: Enable TLS on the internal queue client inside Liaison; if
false, the queue uses plain TCP.
-- `--internal-ca-cert <path>`: PEM‑encoded CA (or bundle) that the queue
client uses to verify Data‑Node server certificates.
+- `--data-client-tls`: Enable TLS on the internal queue client inside Liaison;
if false, the queue uses plain TCP.
+- `--data-client-ca-cert`: PEM‑encoded CA (or bundle) that the queue client
uses to verify Data‑Node server certificates.
Each Liaison/Data process still advertises its certificate with the public
flags (`--tls`, `--cert-file`, `--key-file`). The same certificate/key pair can
be reused for both external traffic and the internal queue.
**Example: Enable internal TLS between liaison and data nodes**
```shell
-banyand liaison --internal-tls=true --internal-ca-cert=ca.crt --tls=true
--cert-file=server.crt --key-file=server.key
+banyand liaison --data-client-tls=true --data-client-ca-cert=ca.crt --tls=true
--cert-file=server.crt --key-file=server.key
banyand data --tls=true --cert-file=server.crt --key-file=server.key
```
-> Note: The `--internal-ca-cert` should point to the CA certificate used to
sign the data node's server certificate.
+> Note:
+> - The `--data-client-ca-cert` should point to the CA certificate used to
sign the data node's server certificate.
+> - Data nodes act as servers and do not need a CA certificate to connect to
liaison nodes (liaison nodes connect to data nodes, not vice versa).
Review Comment:
[nitpick] The note states "Data nodes act as servers and do not need a CA
certificate to connect to liaison nodes (liaison nodes connect to data nodes,
not vice versa)."
This statement could be clearer. Data nodes don't need a CA certificate
because they act as **servers only** in this internal TLS setup - they don't
make client connections to liaison nodes. Consider rephrasing for clarity:
"Data nodes act as servers only and do not need a CA certificate because
liaison nodes connect to data nodes (not vice versa)."
```suggestion
> - Data nodes act as servers only and do not need a CA certificate because
liaison nodes connect to data nodes (not vice versa).
```
##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
}
func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+ if !p.tlsEnabled {
+ return grpchelper.SecureOptions(nil, false, false, "")
+ }
+
+ // Use reloader if available (for dynamic reloading)
+ if p.caCertReloader != nil {
+ // Extract server name from the connection (we'll use a default
for now)
+ // The actual server name will be validated by the TLS handshake
+ tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader: %w", err)
+ }
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // Fallback to static file reading if reloader is not available
opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false,
p.caCertPath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
}
return opts, nil
}
+// reconnectAllClients reconnects all active clients when CA certificate is
updated.
+func (p *pub) reconnectAllClients() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Collect all active nodes
+ nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+ for name, client := range p.active {
+ if node := p.registered[name]; node != nil {
+ nodesToReconnect = append(nodesToReconnect, node)
+ // Close existing connection
+ _ = client.conn.Close()
+ delete(p.active, name)
+ p.deleteClient(client.md)
+ }
+ }
+
+ // Reconnect all nodes asynchronously
+ for _, node := range nodesToReconnect {
+ node := node // capture loop variable
+ if !p.closer.AddRunning() {
+ continue
+ }
+ go func() {
+ defer p.closer.Done()
+ // Wait a bit to ensure the old connection is fully
closed
+ time.Sleep(200 * time.Millisecond)
+
+ credOpts, err := p.getClientTransportCredentials()
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to load client TLS credentials during
reconnect")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ conn, err := grpc.NewClient(node.GrpcAddress,
append(credOpts,
+ grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
Review Comment:
The hardcoded value `32<<20` for MaxCallRecvMsgSize should be replaced with
the constant `maxReceiveMessageSize` which is already defined in `client.go`
and used consistently throughout the codebase.
This ensures consistency across all gRPC client configurations and makes it
easier to maintain if the value needs to be changed in the future.
##########
banyand/queue/pub/pub.go:
##########
@@ -441,13 +486,210 @@ func isFailoverError(err error) bool {
}
func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+ if !p.tlsEnabled {
+ return grpchelper.SecureOptions(nil, false, false, "")
+ }
+
+ // Use reloader if available (for dynamic reloading)
+ if p.caCertReloader != nil {
+ // Extract server name from the connection (we'll use a default
for now)
+ // The actual server name will be validated by the TLS handshake
+ tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+ if err != nil {
+ return nil, fmt.Errorf("failed to get TLS config from
reloader: %w", err)
+ }
+ creds := credentials.NewTLS(tlsConfig)
+ return []grpc.DialOption{grpc.WithTransportCredentials(creds)},
nil
+ }
+
+ // Fallback to static file reading if reloader is not available
opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false,
p.caCertPath)
if err != nil {
return nil, fmt.Errorf("failed to load TLS config: %w", err)
}
return opts, nil
}
+// reconnectAllClients reconnects all active clients when CA certificate is
updated.
+func (p *pub) reconnectAllClients() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Collect all active nodes
+ nodesToReconnect := make([]*databasev1.Node, 0, len(p.active))
+ for name, client := range p.active {
+ if node := p.registered[name]; node != nil {
+ nodesToReconnect = append(nodesToReconnect, node)
+ // Close existing connection
+ _ = client.conn.Close()
+ delete(p.active, name)
+ p.deleteClient(client.md)
+ }
+ }
+
+ // Reconnect all nodes asynchronously
+ for _, node := range nodesToReconnect {
+ node := node // capture loop variable
+ if !p.closer.AddRunning() {
+ continue
+ }
+ go func() {
+ defer p.closer.Done()
+ // Wait a bit to ensure the old connection is fully
closed
+ time.Sleep(200 * time.Millisecond)
+
+ credOpts, err := p.getClientTransportCredentials()
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to load client TLS credentials during
reconnect")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ conn, err := grpc.NewClient(node.GrpcAddress,
append(credOpts,
+ grpc.WithDefaultServiceConfig(p.retryPolicy),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32<<20)))...)
+ if err != nil {
+ p.log.Error().Err(err).Str("node",
node.Metadata.GetName()).Msg("failed to reconnect to grpc server")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
+ }
+
+ // Check health before adding back to active
+ if !p.healthCheck(node.String(), conn) {
+ _ = conn.Close()
+ p.log.Warn().Str("node",
node.Metadata.GetName()).Msg("reconnected node is unhealthy, will retry")
+ // Move to evictable queue for retry
+ p.mu.Lock()
+ name := node.Metadata.GetName()
+ if _, exists := p.registered[name]; exists {
+ md := schema.Metadata{
+ TypeMeta: schema.TypeMeta{
+ Kind: schema.KindNode,
+ },
+ Spec: node,
+ }
+ p.evictable[name] = evictNode{n: node,
c: make(chan struct{})}
+ p.mu.Unlock()
+ p.startEvictRetry(name, node, md)
+ } else {
+ p.mu.Unlock()
+ }
+ return
Review Comment:
The logic for moving a node to the evictable queue and starting retry is
duplicated three times in this function (lines 545-560, 569-584, and 592-607).
Consider extracting this into a helper method like
`moveToEvictableAndRetry(name, node)` to reduce code duplication and improve
maintainability. This would make the error handling paths clearer and ensure
consistent behavior across all failure scenarios.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]