This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c647e99 internal TLS reload (#882)
8c647e99 is described below

commit 8c647e999b4f31f26de94007a384b1f45fb15b4d
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Dec 8 14:23:02 2025 +0800

    internal TLS reload (#882)
    
    
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 banyand/queue/pub/pub.go    | 100 +++++++++++++++++++++++++++++++++++++++++++-
 banyand/queue/sub/server.go |  50 +++++++++++++++++++---
 docs/operation/security.md  |  22 ++++++++--
 3 files changed, 161 insertions(+), 11 deletions(-)

diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index fd69d37a..26d5320f 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -30,6 +30,7 @@ import (
        "go.uber.org/multierr"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
 
@@ -47,6 +48,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
 
 // ChunkedSyncClientConfig configures chunked sync client behavior.
@@ -75,6 +77,7 @@ type pub struct {
        writableProbe   map[string]map[string]struct{}
        cbStates        map[string]*circuitState
        caCertPath      string
+       caCertReloader  *pkgtls.Reloader
        prefix          string
        retryPolicy     string
        allowedRoles    []databasev1.Role
@@ -100,7 +103,7 @@ func (p *pub) FlagSet() *run.FlagSet {
 func (p *pub) Validate() error {
        // simple sanity‑check: if TLS is on, a CA bundle must be provided
        if p.tlsEnabled && p.caCertPath == "" {
-               return fmt.Errorf("TLS is enabled (--internal-tls), but no CA 
certificate file was provided (--internal-ca-cert is required)")
+               return fmt.Errorf("TLS is enabled (--data-client-tls), but no 
CA certificate file was provided (--data-client-ca-cert is required)")
        }
        return nil
 }
@@ -110,6 +113,11 @@ func (p *pub) Register(topic bus.Topic, handler 
schema.EventHandler) {
 }
 
 func (p *pub) GracefulStop() {
+       // Stop CA certificate reloader if enabled
+       if p.caCertReloader != nil {
+               p.caCertReloader.Stop()
+       }
+
        p.mu.Lock()
        defer p.mu.Unlock()
        for i := range p.evictable {
@@ -126,6 +134,35 @@ func (p *pub) GracefulStop() {
 
 // Serve implements run.Service.
 func (p *pub) Serve() run.StopNotify {
+       // Start CA certificate reloader if enabled
+       if p.caCertReloader != nil {
+               if err := p.caCertReloader.Start(); err != nil {
+                       p.log.Error().Err(err).Msg("Failed to start CA 
certificate reloader")
+                       stopCh := p.closer.CloseNotify()
+                       return stopCh
+               }
+               p.log.Info().Str("caCertPath", p.caCertPath).Msg("Started CA 
certificate file monitoring")
+
+               // Listen for certificate update events
+               certUpdateCh := p.caCertReloader.GetUpdateChannel()
+               stopCh := p.closer.CloseNotify()
+               if p.closer.AddRunning() {
+                       go func() {
+                               defer p.closer.Done()
+                               for {
+                                       select {
+                                       case <-certUpdateCh:
+                                               p.log.Info().Msg("CA 
certificate updated, reconnecting clients")
+                                               p.reconnectAllClients()
+                                       case <-stopCh:
+                                               return
+                                       }
+                               }
+                       }()
+               }
+               return stopCh
+       }
+
        return p.closer.CloseNotify()
 }
 
@@ -332,6 +369,17 @@ func (p *pub) PreRun(context.Context) error {
        }
 
        p.log = logger.GetLogger("server-queue-pub-" + p.prefix)
+
+       // Initialize CA certificate reloader if TLS is enabled and CA cert 
path is provided
+       if p.tlsEnabled && p.caCertPath != "" {
+               var err error
+               p.caCertReloader, err = 
pkgtls.NewClientCertReloader(p.caCertPath, p.log)
+               if err != nil {
+                       return errors.Wrapf(err, "failed to initialize CA 
certificate reloader for %s", p.prefix)
+               }
+               p.log.Info().Str("caCertPath", p.caCertPath).Msg("Initialized 
CA certificate reloader")
+       }
+
        return nil
 }
 
@@ -441,6 +489,23 @@ func isFailoverError(err error) bool {
 }
 
 func (p *pub) getClientTransportCredentials() ([]grpc.DialOption, error) {
+       if !p.tlsEnabled {
+               return grpchelper.SecureOptions(nil, false, false, "")
+       }
+
+       // Use reloader if available (for dynamic reloading)
+       if p.caCertReloader != nil {
+               // Extract server name from the connection (we'll use a default 
for now)
+               // The actual server name will be validated by the TLS handshake
+               tlsConfig, err := p.caCertReloader.GetClientTLSConfig("")
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get TLS config from 
reloader: %w", err)
+               }
+               creds := credentials.NewTLS(tlsConfig)
+               return []grpc.DialOption{grpc.WithTransportCredentials(creds)}, 
nil
+       }
+
+       // Fallback to static file reading if reloader is not available
        opts, err := grpchelper.SecureOptions(nil, p.tlsEnabled, false, 
p.caCertPath)
        if err != nil {
                return nil, fmt.Errorf("failed to load TLS config: %w", err)
@@ -448,6 +513,39 @@ func (p *pub) getClientTransportCredentials() 
([]grpc.DialOption, error) {
        return opts, nil
 }
 
+// reconnectAllClients reconnects all active clients when CA certificate is 
updated.
+func (p *pub) reconnectAllClients() {
+       // Collect nodes and close connections
+       p.mu.Lock()
+       nodesToReconnect := make([]schema.Metadata, 0, len(p.registered))
+       for name, node := range p.registered {
+               // Handle evictable nodes: close channel and remove from 
evictable
+               if en, ok := p.evictable[name]; ok {
+                       close(en.c)
+                       delete(p.evictable, name)
+               }
+               // Handle active nodes: close connection and remove from active
+               if client, ok := p.active[name]; ok {
+                       _ = client.conn.Close()
+                       delete(p.active, name)
+                       p.deleteClient(client.md)
+               }
+               md := schema.Metadata{
+                       TypeMeta: schema.TypeMeta{
+                               Kind: schema.KindNode,
+                       },
+                       Spec: node,
+               }
+               nodesToReconnect = append(nodesToReconnect, md)
+       }
+       p.mu.Unlock()
+
+       // Reconnect with new credentials
+       for _, md := range nodesToReconnect {
+               p.OnAddOrUpdate(md)
+       }
+}
+
 // NewChunkedSyncClient implements queue.Client.
 func (p *pub) NewChunkedSyncClient(node string, chunkSize uint32) 
(queue.ChunkedSyncClient, error) {
        return p.NewChunkedSyncClientWithConfig(node, &ChunkedSyncClientConfig{
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index c1949b20..05ca5507 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -51,6 +51,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
 )
 
 const defaultRecvSize = 10 << 20
@@ -72,6 +73,7 @@ type server struct {
        streamv1.UnimplementedStreamServiceServer
        databasev1.UnimplementedSnapshotServiceServer
        creds               credentials.TransportCredentials
+       tlsReloader         *pkgtls.Reloader
        omr                 observability.MetricsRegistry
        metrics             *metrics
        ser                 *grpclib.Server
@@ -126,6 +128,17 @@ func NewServerWithPorts(omr observability.MetricsRegistry, 
flagNamePrefix string
 func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("server-queue-sub")
        s.metrics = newMetrics(s.omr.With(queueSubScope))
+
+       // Initialize TLS reloader if TLS is enabled
+       if s.tls {
+               var err error
+               s.tlsReloader, err = pkgtls.NewReloader(s.certFile, s.keyFile, 
s.log)
+               if err != nil {
+                       return errors.Wrap(err, "failed to initialize TLS 
reloader for queue server")
+               }
+               s.log.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).Msg("Initialized TLS reloader for queue server")
+       }
+
        return nil
 }
 
@@ -188,18 +201,37 @@ func (s *server) Validate() error {
        if s.keyFile == "" {
                return errServerKey
        }
-       creds, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
-       if errTLS != nil {
-               return errors.Wrap(errTLS, "failed to load cert and key")
-       }
-       s.creds = creds
+       // TLS reloader will be initialized in PreRun, so we don't need to load 
credentials here
+       // The credentials will be loaded dynamically when the server starts
        return nil
 }
 
 func (s *server) Serve() run.StopNotify {
        var opts []grpclib.ServerOption
        if s.tls {
-               opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
+               // Start TLS reloader if enabled
+               if s.tlsReloader != nil {
+                       if err := s.tlsReloader.Start(); err != nil {
+                               s.log.Error().Err(err).Msg("Failed to start TLS 
reloader for queue server")
+                               stopCh := make(chan struct{})
+                               close(stopCh)
+                               return stopCh
+                       }
+                       s.log.Info().Str("certFile", s.certFile).Str("keyFile", 
s.keyFile).Msg("Started TLS file monitoring for queue server")
+                       tlsConfig := s.tlsReloader.GetTLSConfig()
+                       creds := credentials.NewTLS(tlsConfig)
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               } else {
+                       // Fallback to static loading if reloader is not 
available
+                       creds, errTLS := 
credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
+                       if errTLS != nil {
+                               s.log.Error().Err(errTLS).Msg("Failed to load 
TLS credentials")
+                               stopCh := make(chan struct{})
+                               close(stopCh)
+                               return stopCh
+                       }
+                       opts = []grpclib.ServerOption{grpclib.Creds(creds)}
+               }
        }
        grpcPanicRecoveryHandler := func(p any) (err error) {
                s.log.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
@@ -290,6 +322,12 @@ func (s *server) Serve() run.StopNotify {
 
 func (s *server) GracefulStop() {
        s.log.Info().Msg("stopping")
+
+       // Stop TLS reloader if enabled
+       if s.tlsReloader != nil {
+               s.tlsReloader.Stop()
+       }
+
        stopped := make(chan struct{})
        s.clientCloser()
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
diff --git a/docs/operation/security.md b/docs/operation/security.md
index 748e8dd4..59ad6c7e 100644
--- a/docs/operation/security.md
+++ b/docs/operation/security.md
@@ -123,19 +123,33 @@ BanyanDB supports enabling TLS for the internal gRPC 
queue between liaison and d
 
 The following flags are used to configure internal TLS:
 
-- `--internal-tls`: Enable TLS on the internal queue client inside Liaison; if 
false, the queue uses plain TCP.
-- `--internal-ca-cert <path>`: PEM‑encoded CA (or bundle) that the queue 
client uses to verify Data‑Node server certificates.
+- `--data-client-tls`: Enable TLS on the internal queue client inside Liaison; 
if false, the queue uses plain TCP.
+- `--data-client-ca-cert`: PEM‑encoded CA (or bundle) that the queue client 
uses to verify Data‑Node server certificates.
 
 Each Liaison/Data process still advertises its certificate with the public 
flags (`--tls`, `--cert-file`, `--key-file`). The same certificate/key pair can 
be reused for both external traffic and the internal queue.
 
 **Example: Enable internal TLS between liaison and data nodes**
 
 ```shell
-banyand liaison --internal-tls=true --internal-ca-cert=ca.crt --tls=true 
--cert-file=server.crt --key-file=server.key
+banyand liaison --data-client-tls=true --data-client-ca-cert=ca.crt --tls=true 
--cert-file=server.crt --key-file=server.key
 banyand data --tls=true --cert-file=server.crt --key-file=server.key
 ```
 
-> Note: The `--internal-ca-cert` should point to the CA certificate used to 
sign the data node's server certificate.
+> Note: 
+> - The `--data-client-ca-cert` should point to the CA certificate used to 
sign the data node's server certificate.
+> - Data nodes act as servers and do not need a CA certificate to connect to 
liaison nodes (liaison nodes connect to data nodes, not vice versa).
+> - The flag names use the prefix "data" because liaison nodes connect to data 
nodes. The actual flag names are `--data-client-tls` and 
`--data-client-ca-cert`.
+
+**Dynamic Certificate Reloading**
+
+All certificates used for internal TLS can be reloaded automatically when they 
are updated:
+
+- **Liaison nodes**:
+  - CA certificate file (`--data-client-ca-cert`): Can be updated, and the 
server will automatically reload it and reconnect all clients to data nodes 
with the new certificate.
+  - Server certificate files (`--cert-file`, `--key-file`): Can be updated, 
and the server will automatically reload them. These certificates are used for 
both external client connections and can be reused for internal queue 
communication.
+- **Data nodes**: The server certificate files (`--cert-file`, `--key-file`) 
can be updated, and the server will automatically reload them without requiring 
a restart.
+
+You can update the files or recreate the files, and the servers will 
automatically reload them.
 
 ## Authorization
 

Reply via email to