This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b1d735ce implement dynamic reload of credential files (#734)
b1d735ce is described below

commit b1d735ceebc6f0477a8fa4b03947ab9acb7bb2ce
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Aug 26 13:17:12 2025 +0800

    implement dynamic reload of credential files (#734)
    
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 banyand/liaison/grpc/auth.go                   |  15 +-
 banyand/liaison/grpc/server.go                 |  34 ++-
 banyand/liaison/http/auth.go                   |  10 +-
 banyand/liaison/http/server.go                 |  12 +-
 banyand/liaison/pkg/auth/config.go             | 105 ---------
 banyand/liaison/pkg/auth/reloader.go           | 302 +++++++++++++++++++++++++
 banyand/liaison/pkg/auth/reloader_test.go      | 122 ++++++++++
 pkg/cmdsetup/liaison.go                        |   2 +-
 pkg/cmdsetup/standalone.go                     |   2 +-
 test/integration/standalone/other/auth_test.go | 189 +++++++++-------
 10 files changed, 574 insertions(+), 219 deletions(-)

diff --git a/banyand/liaison/grpc/auth.go b/banyand/liaison/grpc/auth.go
index 52ad21a5..8dc88265 100644
--- a/banyand/liaison/grpc/auth.go
+++ b/banyand/liaison/grpc/auth.go
@@ -28,47 +28,49 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
 )
 
-func authInterceptor(cfg *auth.Config) grpc.UnaryServerInterceptor {
+func authInterceptor(authReloader *auth.Reloader) grpc.UnaryServerInterceptor {
        return func(
                ctx context.Context,
                req interface{},
                info *grpc.UnaryServerInfo,
                handler grpc.UnaryHandler,
        ) (interface{}, error) {
+               cfg := authReloader.GetConfig()
                if !cfg.Enabled {
                        return handler(ctx, req)
                }
                if info.FullMethod == "/grpc.health.v1.Health/Check" && 
!cfg.HealthAuthEnabled {
                        return handler(ctx, req)
                }
-               if err := validateUser(ctx, cfg); err != nil {
+               if err := validateUser(ctx, authReloader); err != nil {
                        return nil, err
                }
                return handler(ctx, req)
        }
 }
 
-func authStreamInterceptor(cfg *auth.Config) grpc.StreamServerInterceptor {
+func authStreamInterceptor(authReloader *auth.Reloader) 
grpc.StreamServerInterceptor {
        return func(
                srv interface{},
                stream grpc.ServerStream,
                info *grpc.StreamServerInfo,
                handler grpc.StreamHandler,
        ) error {
+               cfg := authReloader.GetConfig()
                if !cfg.Enabled {
                        return handler(srv, stream)
                }
                if info.FullMethod == "/grpc.health.v1.Health/Check" && 
!cfg.HealthAuthEnabled {
                        return handler(srv, stream)
                }
-               if err := validateUser(stream.Context(), cfg); err != nil {
+               if err := validateUser(stream.Context(), authReloader); err != 
nil {
                        return err
                }
                return handler(srv, stream)
        }
 }
 
-func validateUser(ctx context.Context, cfg *auth.Config) error {
+func validateUser(ctx context.Context, authReloader *auth.Reloader) error {
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
                return status.Errorf(codes.Unauthenticated, "metadata is not 
provided")
@@ -84,9 +86,8 @@ func validateUser(ctx context.Context, cfg *auth.Config) 
error {
        username := usernames[0]
        password := passwords[0]
 
-       if !auth.CheckUsernameAndPassword(cfg, username, password) {
+       if !authReloader.CheckUsernameAndPassword(username, password) {
                return status.Errorf(codes.Unauthenticated, "Invalid 
credentials")
        }
-
        return nil
 }
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6a55dc65..fae1180b 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -66,7 +66,7 @@ var (
 // Server defines the gRPC server.
 type Server interface {
        run.Unit
-       GetAuthCfg() *auth.Config
+       GetAuthReloader() *auth.Reloader
        GetPort() *uint32
 }
 
@@ -96,12 +96,12 @@ type server struct {
        *propertyServer
        *indexRuleBindingRegistryServer
        *traceRegistryServer
+       authReloader             *auth.Reloader
        groupRepo                *groupRepo
        metrics                  *metrics
        certFile                 string
        keyFile                  string
        authConfigFile           string
-       cfg                      *auth.Config
        host                     string
        addr                     string
        accessLogRootPath        string
@@ -110,6 +110,7 @@ type server struct {
        port                     uint32
        enableIngestionAccessLog bool
        tls                      bool
+       healthAuthEnabled        bool
 }
 
 // NewServer returns a new gRPC server.
@@ -164,8 +165,8 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                traceRegistryServer: &traceRegistryServer{
                        schemaRegistry: schemaRegistry,
                },
-               schemaRepo: schemaRegistry,
-               cfg:        auth.InitCfg(),
+               schemaRepo:   schemaRegistry,
+               authReloader: auth.InitAuthReloader(),
        }
        s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC}
 
@@ -190,7 +191,7 @@ func (s *server) PreRun(_ context.Context) error {
                }
        }
        if s.authConfigFile != "" {
-               if err := auth.LoadConfig(s.cfg, s.authConfigFile); err != nil {
+               if err := s.authReloader.ConfigAuthReloader(s.authConfigFile, 
s.healthAuthEnabled, s.log); err != nil {
                        return err
                }
        }
@@ -239,9 +240,9 @@ func (s *server) GetPort() *uint32 {
        return &s.port
 }
 
-// GetAuthCfg returns auth cfg (for httpserver).
-func (s *server) GetAuthCfg() *auth.Config {
-       return s.cfg
+// GetAuthReloader returns auth reloader (for httpserver).
+func (s *server) GetAuthReloader() *auth.Reloader {
+       return s.authReloader
 }
 
 func (s *server) FlagSet() *run.FlagSet {
@@ -252,7 +253,7 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file")
        fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file")
        fs.StringVar(&s.authConfigFile, "auth-config-file", "", "Path to the 
authentication config file (YAML format)")
-       fs.BoolVar(&s.cfg.HealthAuthEnabled, "enable-health-auth", false, 
"enable authentication for health check")
+       fs.BoolVar(&s.healthAuthEnabled, "enable-health-auth", false, "enable 
authentication for health check")
        fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens")
        fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
        fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", 
false, "enable ingestion access log")
@@ -300,6 +301,14 @@ func (s *server) Serve() run.StopNotify {
                creds := credentials.NewTLS(tlsConfig)
                opts = append(opts, grpclib.Creds(creds))
        }
+       if s.authConfigFile != "" {
+               if err := s.authReloader.Start(); err != nil {
+                       s.log.Error().Err(err).Msg("Failed to start 
authReloader for gRPC")
+                       close(s.stopCh)
+                       return s.stopCh
+               }
+               s.log.Info().Str("authConfigFile", 
s.authConfigFile).Msg("Starting auth config file monitoring")
+       }
        grpcPanicRecoveryHandler := func(p any) (err error) {
                s.log.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
                s.metrics.totalPanic.Inc(1)
@@ -315,8 +324,8 @@ func (s *server) Serve() run.StopNotify {
                
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
        }
        if s.authConfigFile != "" {
-               streamChain = append(streamChain, authStreamInterceptor(s.cfg))
-               unaryChain = append(unaryChain, authInterceptor(s.cfg))
+               streamChain = append(streamChain, 
authStreamInterceptor(s.authReloader))
+               unaryChain = append(unaryChain, authInterceptor(s.authReloader))
        }
 
        opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
@@ -365,6 +374,9 @@ func (s *server) GracefulStop() {
        if s.tls && s.tlsReloader != nil {
                s.tlsReloader.Stop()
        }
+       if s.authConfigFile != "" && s.authReloader != nil {
+               s.authReloader.Stop()
+       }
        stopped := make(chan struct{})
        go func() {
                s.ser.GracefulStop()
diff --git a/banyand/liaison/http/auth.go b/banyand/liaison/http/auth.go
index 716e73f4..141b5846 100644
--- a/banyand/liaison/http/auth.go
+++ b/banyand/liaison/http/auth.go
@@ -29,14 +29,14 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
 )
 
-func authMiddleware(cfg *auth.Config) func(http.Handler) http.Handler {
+func authMiddleware(authReloader *auth.Reloader) func(http.Handler) 
http.Handler {
        return func(next http.Handler) http.Handler {
                return http.HandlerFunc(func(w http.ResponseWriter, r 
*http.Request) {
                        if isStaticPath(r.URL.Path) {
                                next.ServeHTTP(w, r)
                                return
                        }
-
+                       cfg := authReloader.GetConfig()
                        if !cfg.Enabled {
                                next.ServeHTTP(w, r)
                                return
@@ -77,7 +77,7 @@ func authMiddleware(cfg *auth.Config) func(http.Handler) 
http.Handler {
 
                        username := parts[0]
                        password := parts[1]
-                       if !auth.CheckUsernameAndPassword(cfg, username, 
password) {
+                       if !authReloader.CheckUsernameAndPassword(username, 
password) {
                                http.Error(w, `{"error": "invalid 
credentials"}`, http.StatusUnauthorized)
                                return
                        }
@@ -105,9 +105,9 @@ func isStaticPath(path string) bool {
        return false
 }
 
-func buildGRPCContextForHealthCheck(cfg *auth.Config, r *http.Request) 
(context.Context, error) {
+func buildGRPCContextForHealthCheck(authReloader *auth.Reloader, r 
*http.Request) (context.Context, error) {
        ctx := r.Context()
-
+       cfg := authReloader.GetConfig()
        if cfg.HealthAuthEnabled {
                username := r.Header.Get("Grpc-Metadata-Username")
                password := r.Header.Get("Grpc-Metadata-Password")
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index b0a88241..20bac794 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -60,10 +60,10 @@ var (
 )
 
 // NewServer return a http service.
-func NewServer(cfg *auth.Config) Server {
+func NewServer(authReloader *auth.Reloader) Server {
        return &server{
-               stopCh: make(chan struct{}),
-               cfg:    cfg,
+               stopCh:       make(chan struct{}),
+               authReloader: authReloader,
        }
 }
 
@@ -90,7 +90,7 @@ type server struct {
        grpcAddr        string
        keyFile         string
        certFile        string
-       cfg             *auth.Config
+       authReloader    *auth.Reloader
        grpcCert        string
        grpcMu          sync.Mutex
        port            uint32
@@ -368,9 +368,9 @@ func (p *server) initGRPCClient() error {
        // This avoids the conflict when remounting to /api path
        newMux := chi.NewRouter()
 
-       newMux.Use(authMiddleware(p.cfg))
+       newMux.Use(authMiddleware(p.authReloader))
        newMux.Handle("/api/healthz", http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
-               ctx, err := buildGRPCContextForHealthCheck(p.cfg, r)
+               ctx, err := buildGRPCContextForHealthCheck(p.authReloader, r)
                if err != nil {
                        http.Error(w, err.Error(), http.StatusUnauthorized)
                        return
diff --git a/banyand/liaison/pkg/auth/config.go 
b/banyand/liaison/pkg/auth/config.go
deleted file mode 100644
index f595a93d..00000000
--- a/banyand/liaison/pkg/auth/config.go
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package auth provides configuration management and validation logic for 
authentication.
-package auth
-
-import (
-       "crypto/subtle"
-       "fmt"
-       "os"
-       "strings"
-
-       "sigs.k8s.io/yaml"
-)
-
-// Config AuthConfig.
-type Config struct {
-       Users             []User `yaml:"users"`
-       Enabled           bool   `yaml:"-"`
-       HealthAuthEnabled bool   `yaml:"-"`
-}
-
-// User details from config file.
-type User struct {
-       Username string `yaml:"username"`
-       Password string `yaml:"password"`
-}
-
-// InitCfg returns Config with default values.
-func InitCfg() *Config {
-       return &Config{
-               Enabled:           false,
-               HealthAuthEnabled: false,
-               Users:             []User{},
-       }
-}
-
-// LoadConfig implements the reading of the authentication configuration.
-func LoadConfig(cfg *Config, filePath string) error {
-       cfg.Enabled = true
-
-       info, err := os.Stat(filePath)
-       if err != nil {
-               return err
-       }
-       perm := info.Mode().Perm()
-       if perm != 0o600 {
-               return fmt.Errorf("config file %s has unsafe permissions: %o 
(expected 0600)", filePath, perm)
-       }
-
-       data, err := os.ReadFile(filePath)
-       if err != nil {
-               return err
-       }
-       err = yaml.Unmarshal(data, cfg)
-       if err != nil {
-               return err
-       }
-       return nil
-}
-
-// CheckUsernameAndPassword returns true if the provided username and password 
match any configured user.
-func CheckUsernameAndPassword(cfg *Config, username, password string) bool {
-       username = strings.TrimSpace(username)
-       password = strings.TrimSpace(password)
-
-       for _, user := range cfg.Users {
-               storedUsername := strings.TrimSpace(user.Username)
-               storedPassword := strings.TrimSpace(user.Password)
-
-               // Convert to []byte
-               usernameBytes := []byte(username)
-               storedUsernameBytes := []byte(storedUsername)
-               passwordBytes := []byte(password)
-               storedPasswordBytes := []byte(storedPassword)
-
-               // Length must match
-               if len(usernameBytes) != len(storedUsernameBytes) || 
len(passwordBytes) != len(storedPasswordBytes) {
-                       continue
-               }
-
-               // Use constant-time comparison
-               usernameMatch := subtle.ConstantTimeCompare(usernameBytes, 
storedUsernameBytes) == 1
-               passwordMatch := subtle.ConstantTimeCompare(passwordBytes, 
storedPasswordBytes) == 1
-
-               if usernameMatch && passwordMatch {
-                       return true
-               }
-       }
-       return false
-}
diff --git a/banyand/liaison/pkg/auth/reloader.go 
b/banyand/liaison/pkg/auth/reloader.go
new file mode 100644
index 00000000..c593e309
--- /dev/null
+++ b/banyand/liaison/pkg/auth/reloader.go
@@ -0,0 +1,302 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package auth provides configuration management and validation logic for 
authentication.
+package auth
+
+import (
+       "bytes"
+       "crypto/sha256"
+       "crypto/subtle"
+       "fmt"
+       "os"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/fsnotify/fsnotify"
+       "github.com/pkg/errors"
+       "sigs.k8s.io/yaml"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// Config AuthConfig.
+type Config struct {
+       Users             []User `yaml:"users"`
+       Enabled           bool   `yaml:"-"`
+       HealthAuthEnabled bool   `yaml:"-"`
+}
+
+// User details from config file.
+type User struct {
+       Username string `yaml:"username"`
+       Password string `yaml:"password"`
+}
+
+// InitCfg returns Config with default values.
+func InitCfg() *Config {
+       return &Config{
+               Enabled:           false,
+               HealthAuthEnabled: false,
+               Users:             []User{},
+       }
+}
+
+// loadConfig implements the reading of the authentication configuration.
+func (ar *Reloader) loadConfig(filePath string) error {
+       if filePath == "" {
+               return errors.New("configFile must be provided")
+       }
+       info, err := os.Stat(filePath)
+       if err != nil {
+               return err
+       }
+       perm := info.Mode().Perm()
+       if perm != 0o600 {
+               return fmt.Errorf("config file %s has unsafe permissions: %o 
(expected 0600)", filePath, perm)
+       }
+
+       data, err := os.ReadFile(filePath)
+       if err != nil {
+               return err
+       }
+       newCfg := InitCfg()
+       err = yaml.Unmarshal(data, newCfg)
+       if err != nil {
+               return err
+       }
+       ar.setAuthEnabled(true)
+       ar.setUsers(newCfg.Users)
+       return nil
+}
+
+// Reloader manages dynamic reloading of auth config.
+type Reloader struct {
+       debounceTimer  *time.Timer
+       updateCh       chan struct{}
+       configFile     string
+       Config         *Config
+       watcher        *fsnotify.Watcher
+       log            *logger.Logger
+       lastConfigHash []byte
+       mu             sync.RWMutex
+}
+
+// InitAuthReloader returns Reloader with default values.
+func InitAuthReloader() *Reloader {
+       return &Reloader{
+               Config: InitCfg(),
+       }
+}
+
+// ConfigAuthReloader returns a Reloader instance with properties populated.
+func (ar *Reloader) ConfigAuthReloader(configFile string, healthAuthEnabled 
bool, log *logger.Logger) error {
+       if configFile == "" {
+               return errors.New("configFile must be provided")
+       }
+       if log == nil {
+               return errors.New("logger must not be nil")
+       }
+       err := ar.loadConfig(configFile)
+       if err != nil {
+               return errors.Wrapf(err, "failed to load initial auth config 
from %s", configFile)
+       }
+       cfg := ar.GetConfig()
+       ar.setHealthAuthEnabled(healthAuthEnabled)
+
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               return errors.Wrap(err, "failed to create fsnotify watcher")
+       }
+
+       ar.setConfig(cfg)
+       ar.configFile = configFile
+       ar.log = log
+       ar.watcher = watcher
+       ar.updateCh = make(chan struct{}, 1)
+       ar.lastConfigHash, _ = ar.computeFileHash(configFile)
+
+       return nil
+}
+
+// Start begins monitoring the config file.
+func (ar *Reloader) Start() error {
+       if err := ar.watcher.Add(ar.configFile); err != nil {
+               return errors.Wrapf(err, "failed to watch config file: %s", 
ar.configFile)
+       }
+
+       go ar.watchFiles()
+       return nil
+}
+
+// Stop stops the watcher.
+func (ar *Reloader) Stop() {
+       _ = ar.watcher.Close()
+}
+
+// GetConfig returns the current config (safe for concurrent use).
+func (ar *Reloader) GetConfig() *Config {
+       ar.mu.RLock()
+       defer ar.mu.RUnlock()
+       return ar.Config
+}
+
+func (ar *Reloader) setConfig(cfg *Config) {
+       ar.mu.Lock()
+       defer ar.mu.Unlock()
+       ar.Config = cfg
+}
+
+func (ar *Reloader) setHealthAuthEnabled(enabled bool) {
+       ar.mu.Lock()
+       defer ar.mu.Unlock()
+       ar.Config.HealthAuthEnabled = enabled
+}
+
+func (ar *Reloader) setAuthEnabled(enabled bool) {
+       ar.mu.Lock()
+       defer ar.mu.Unlock()
+       ar.Config.Enabled = enabled
+}
+
+func (ar *Reloader) setUsers(users []User) {
+       ar.mu.Lock()
+       defer ar.mu.Unlock()
+       ar.Config.Users = users
+}
+
+// CheckUsernameAndPassword returns true if the provided username and password 
match any configured user.
+func (ar *Reloader) CheckUsernameAndPassword(username, password string) bool {
+       cfg := ar.GetConfig()
+
+       username = strings.TrimSpace(username)
+       password = strings.TrimSpace(password)
+
+       for _, user := range cfg.Users {
+               storedUsername := strings.TrimSpace(user.Username)
+               storedPassword := strings.TrimSpace(user.Password)
+
+               // Convert to []byte
+               usernameBytes := []byte(username)
+               storedUsernameBytes := []byte(storedUsername)
+               passwordBytes := []byte(password)
+               storedPasswordBytes := []byte(storedPassword)
+
+               // Length must match
+               if len(usernameBytes) != len(storedUsernameBytes) || 
len(passwordBytes) != len(storedPasswordBytes) {
+                       continue
+               }
+
+               // Use constant-time comparison
+               usernameMatch := subtle.ConstantTimeCompare(usernameBytes, 
storedUsernameBytes) == 1
+               passwordMatch := subtle.ConstantTimeCompare(passwordBytes, 
storedPasswordBytes) == 1
+
+               if usernameMatch && passwordMatch {
+                       return true
+               }
+       }
+       return false
+}
+
+// watchFiles listens for config changes.
+func (ar *Reloader) watchFiles() {
+       for {
+               if ar.watcher == nil {
+                       ar.log.Error().Msg("watcher is nil, exiting watchFiles")
+                       return
+               }
+               select {
+               case event, ok := <-ar.watcher.Events:
+                       if !ok {
+                               return
+                       }
+                       ar.log.Debug().Str("file", event.Name).Str("op", 
event.Op.String()).Msg("Detected auth file event")
+                       if 
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) != 0 {
+                               ar.scheduleReloadAttempt()
+                       }
+               case err, ok := <-ar.watcher.Errors:
+                       if !ok {
+                               return
+                       }
+                       ar.log.Error().Err(err).Msg("watcher error")
+               }
+       }
+}
+
+// scheduleReloadAttempt debounces reload attempts.
+func (ar *Reloader) scheduleReloadAttempt() {
+       if ar.debounceTimer == nil {
+               ar.debounceTimer = time.AfterFunc(500*time.Millisecond, 
ar.tryReload)
+       } else {
+               ar.debounceTimer.Reset(500 * time.Millisecond)
+       }
+}
+
+// tryReload reloads config if changed.
+func (ar *Reloader) tryReload() {
+       changed, newHash, err := ar.checkContentChanged()
+       if err != nil {
+               ar.log.Error().Err(err).Msg("error checking config change")
+               return
+       }
+       if !changed {
+               return
+       }
+
+       err = ar.loadConfig(ar.configFile)
+       if err != nil {
+               ar.log.Error().Err(err).Msg("failed to reload config")
+               return
+       }
+
+       ar.mu.Lock()
+       ar.lastConfigHash = newHash
+       ar.mu.Unlock()
+
+       // notify
+       select {
+       case ar.updateCh <- struct{}{}:
+       default:
+       }
+       ar.log.Info().Msg("auth config updated in memory")
+}
+
+// checkContentChanged compares file hash.
+func (ar *Reloader) checkContentChanged() (bool, []byte, error) {
+       currentHash, err := ar.computeFileHash(ar.configFile)
+       if err != nil {
+               return false, nil, err
+       }
+       return !bytes.Equal(ar.lastConfigHash, currentHash), currentHash, nil
+}
+
+// computeFileHash computes sha256 of file.
+func (ar *Reloader) computeFileHash(filePath string) ([]byte, error) {
+       content, err := os.ReadFile(filePath)
+       if err != nil {
+               return nil, err
+       }
+       h := sha256.New()
+       h.Write(content)
+       return h.Sum(nil), nil
+}
+
+// GetUpdateChannel allows external consumers to watch for updates.
+func (ar *Reloader) GetUpdateChannel() <-chan struct{} {
+       return ar.updateCh
+}
diff --git a/banyand/liaison/pkg/auth/reloader_test.go 
b/banyand/liaison/pkg/auth/reloader_test.go
new file mode 100644
index 00000000..176ead04
--- /dev/null
+++ b/banyand/liaison/pkg/auth/reloader_test.go
@@ -0,0 +1,122 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func writeConfigFile(t *testing.T, dir, filename, content string) string {
+       t.Helper()
+       path := filepath.Join(dir, filename)
+       err := os.WriteFile(path, []byte(content), 0o600)
+       if err != nil {
+               t.Fatalf("failed to write config file: %v", err)
+       }
+       return path
+}
+
+func TestLoadConfigAndAuthCheck(t *testing.T) {
+       dir := t.TempDir()
+       configYAML := `
+users:
+  - username: "alice"
+    password: "secret"
+  - username: "bob"
+    password: "hunter2"
+`
+       path := writeConfigFile(t, dir, "auth.yaml", configYAML)
+       // init reloader
+       ar := InitAuthReloader()
+       err := ar.loadConfig(path)
+       if err != nil {
+               t.Fatalf("expected loadConfig success, got error: %v", err)
+       }
+       cfg := ar.GetConfig()
+       if len(cfg.Users) != 2 {
+               t.Fatalf("expected 2 users, got %d", len(cfg.Users))
+       }
+       log := logger.GetLogger("auth-test")
+       if err := ar.ConfigAuthReloader(path, false, log); err != nil {
+               t.Fatalf("ConfigAuthReloader failed: %v", err)
+       }
+
+       if !ar.CheckUsernameAndPassword("alice", "secret") {
+               t.Errorf("expected alice/secret to be valid")
+       }
+       if ar.CheckUsernameAndPassword("alice", "wrong") {
+               t.Errorf("expected alice/wrong to be invalid")
+       }
+       if ar.CheckUsernameAndPassword("notexist", "secret") {
+               t.Errorf("expected non-existent user to fail")
+       }
+}
+
+func TestReloaderUpdatesOnFileChange(t *testing.T) {
+       dir := t.TempDir()
+       initialYAML := `
+users:
+  - username: "alice"
+    password: "secret"
+`
+       path := writeConfigFile(t, dir, "auth.yaml", initialYAML)
+
+       ar := InitAuthReloader()
+       log := logger.GetLogger("auth-test")
+       if err := ar.ConfigAuthReloader(path, false, log); err != nil {
+               t.Fatalf("ConfigAuthReloader failed: %v", err)
+       }
+
+       if err := ar.Start(); err != nil {
+               t.Fatalf("failed to start reloader: %v", err)
+       }
+       defer ar.Stop()
+
+       if !ar.CheckUsernameAndPassword("alice", "secret") {
+               t.Fatalf("expected alice/secret to be valid before update")
+       }
+
+       updatedYAML := `
+users:
+  - username: "bob"
+    password: "hunter2"
+`
+       err := os.WriteFile(path, []byte(updatedYAML), 0o600)
+       if err != nil {
+               t.Fatalf("failed to update config file: %v", err)
+       }
+
+       select {
+       case <-ar.GetUpdateChannel():
+               // ok
+       case <-time.After(2 * time.Second):
+               t.Fatalf("timed out waiting for update channel notification")
+       }
+
+       if ar.CheckUsernameAndPassword("alice", "secret") {
+               t.Errorf("alice should no longer be valid after update")
+       }
+       if !ar.CheckUsernameAndPassword("bob", "hunter2") {
+               t.Errorf("expected bob/hunter2 to be valid after update")
+       }
+}
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 97f3f091..5c42365e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -81,7 +81,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                PropertyNodeRegistry:       
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client, 
propertyNodeSel),
        }, metricSvc)
        profSvc := observability.NewProfService()
-       httpServer := http.NewServer(grpcServer.GetAuthCfg())
+       httpServer := http.NewServer(grpcServer.GetAuthReloader())
        var units []run.Unit
        units = append(units, runners...)
        units = append(units,
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 9bc005aa..90b9293e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -76,7 +76,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
                PropertyNodeRegistry:       nr,
        }, metricSvc)
        profSvc := observability.NewProfService()
-       httpServer := http.NewServer(grpcServer.GetAuthCfg())
+       httpServer := http.NewServer(grpcServer.GetAuthReloader())
 
        var units []run.Unit
        units = append(units, runners...)
diff --git a/test/integration/standalone/other/auth_test.go 
b/test/integration/standalone/other/auth_test.go
index 20cc06bc..7ce4eef0 100644
--- a/test/integration/standalone/other/auth_test.go
+++ b/test/integration/standalone/other/auth_test.go
@@ -54,6 +54,8 @@ var _ = g.Describe("Query service_cpm_minute with 
authentication", func() {
        var goods []gleak.Goroutine
        var grpcAddr, httpAddr string
        var testUser serverAuth.User
+       var authCfgFile string
+       var httpClient *http.Client
 
        g.BeforeEach(func() {
                // load server config.yaml
@@ -62,7 +64,7 @@ var _ = g.Describe("Query service_cpm_minute with 
authentication", func() {
                tempServerCfg := filepath.Join(os.TempDir(), 
fmt.Sprintf(".bydb-%s.yaml", uuid.New().String()))
                err = os.WriteFile(tempServerCfg, cfgBytes, 0o600)
                gm.Expect(err).NotTo(gm.HaveOccurred())
-               authCfgFile := tempServerCfg
+               authCfgFile = tempServerCfg
                info, _ := os.Stat(authCfgFile)
                gm.Expect(info.Mode().Perm()).To(gm.Equal(os.FileMode(0o600)))
 
@@ -85,6 +87,7 @@ var _ = g.Describe("Query service_cpm_minute with 
authentication", func() {
                baseTime = time.Unix(0, ns-ns%int64(time.Minute))
                interval = 500 * time.Millisecond
                casesMeasureData.WriteWithAuth(conn, "service_cpm_minute", 
"sw_metric", "service_cpm_minute_data.json", baseTime, interval, 
testUser.Username, testUser.Password)
+               httpClient = &http.Client{}
                goods = gleak.Goroutines()
        })
 
@@ -95,116 +98,136 @@ var _ = g.Describe("Query service_cpm_minute with 
authentication", func() {
        })
 
        g.It("grpc query and healthcheck with correct username and password", 
func() {
-               gm.Eventually(func(innerGm gm.Gomega) {
-                       casesMeasureData.VerifyFnWithAuth(innerGm, 
helpers.SharedContext{
-                               Connection: conn,
-                               BaseTime:   baseTime,
-                       },
-                               helpers.Args{Input: "all", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute},
-                               testUser.Username, testUser.Password)
-               }, flags.EventuallyTimeout).Should(gm.Succeed())
-
+               verifyGRPCQuery(conn, baseTime, testUser.Username, 
testUser.Password, true)
                opts := make([]grpclib.DialOption, 0, 1)
                opts, err := grpchelper.SecureOptions(opts, false, true, "")
                gm.Expect(err).ToNot(gm.HaveOccurred())
-               gm.Eventually(func() error {
-                       return helpers.HealthCheckWithAuth(grpcAddr, 
10*time.Second, 10*time.Second, testUser.Username, testUser.Password, opts...)()
-               }, flags.EventuallyTimeout).Should(gm.Succeed())
+               checkGRPCHealth(grpcAddr, testUser.Username, testUser.Password, 
opts, true)
        })
 
        g.It("grpc query and healthcheck with wrong username and password", 
func() {
-               gm.Eventually(func(innerGm gm.Gomega) {
-                       casesMeasureData.VerifyFnWithAuth(innerGm, 
helpers.SharedContext{
-                               Connection: conn,
-                               BaseTime:   baseTime,
-                       },
-                               helpers.Args{Input: "all", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute},
-                               testUser.Username, testUser.Password+"wrong")
-               }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
-
+               verifyGRPCQuery(conn, baseTime, testUser.Username, 
testUser.Password+"wrong", false)
                opts := make([]grpclib.DialOption, 0, 1)
                opts, err := grpchelper.SecureOptions(opts, false, true, "")
                gm.Expect(err).ToNot(gm.HaveOccurred())
-               gm.Eventually(func() error {
-                       return helpers.HealthCheckWithAuth(grpcAddr, 
10*time.Second, 10*time.Second, testUser.Username, testUser.Password+"wrong", 
opts...)()
-               }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+               checkGRPCHealth(grpcAddr, testUser.Username, 
testUser.Password+"wrong", opts, false)
        })
 
        g.It("http query and healthcheck with correct username and password", 
func() {
-               httpClient := &http.Client{}
                groupListURL := 
fmt.Sprintf("http://%s/api/v1/group/schema/lists";, httpAddr)
                gm.Eventually(func() error {
-                       req, err := http.NewRequest(http.MethodGet, 
groupListURL, nil)
-                       if err != nil {
-                               return err
-                       }
-                       req.Header.Add("Authorization", 
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password))
-                       resp, respErr := httpClient.Do(req)
-                       if respErr != nil {
-                               return respErr
-                       }
-                       defer resp.Body.Close()
-                       if resp.StatusCode != http.StatusOK {
-                               return fmt.Errorf("unexpected status code: %d", 
resp.StatusCode)
-                       }
-                       return nil
+                       return doHTTPCheck(httpClient, groupListURL, 
testUser.Username, testUser.Password, true)
                }, flags.EventuallyTimeout).Should(gm.Succeed())
 
                healthCheckURL := fmt.Sprintf("http://%s/api/healthz";, httpAddr)
                gm.Eventually(func() error {
-                       req, err := http.NewRequest(http.MethodGet, 
healthCheckURL, nil)
-                       if err != nil {
-                               return err
-                       }
-                       req.Header.Add("Authorization", 
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password))
-                       resp, respErr := httpClient.Do(req)
-                       if respErr != nil {
-                               return respErr
-                       }
-                       defer resp.Body.Close()
-                       if resp.StatusCode != http.StatusOK {
-                               return fmt.Errorf("unexpected status code: %d", 
resp.StatusCode)
-                       }
-                       return nil
+                       return doHTTPCheck(httpClient, healthCheckURL, 
testUser.Username, testUser.Password, true)
                }, flags.EventuallyTimeout).Should(gm.Succeed())
        })
 
        g.It("http query and healthcheck with wrong username and password", 
func() {
-               httpClient := &http.Client{}
                groupListURL := 
fmt.Sprintf("http://%s/api/v1/group/schema/lists";, httpAddr)
                gm.Eventually(func() error {
-                       req, err := http.NewRequest(http.MethodGet, 
groupListURL, nil)
-                       if err != nil {
-                               return err
-                       }
-                       req.Header.Add("Authorization", 
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password+"wrong"))
-                       resp, respErr := httpClient.Do(req)
-                       if respErr != nil {
-                               return respErr
-                       }
-                       defer resp.Body.Close()
-                       if resp.StatusCode != http.StatusOK {
-                               return fmt.Errorf("unexpected status code: %d", 
resp.StatusCode)
-                       }
-                       return nil
+                       return doHTTPCheck(httpClient, groupListURL, 
testUser.Username, testUser.Password+"wrong", true)
                }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
 
                healthCheckURL := fmt.Sprintf("http://%s/api/healthz";, httpAddr)
                gm.Eventually(func() error {
-                       req, err := http.NewRequest(http.MethodGet, 
healthCheckURL, nil)
-                       if err != nil {
-                               return err
-                       }
-                       req.Header.Add("Authorization", 
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password+"wrong"))
-                       resp, respErr := httpClient.Do(req)
-                       if respErr != nil {
-                               return respErr
-                       }
-                       defer resp.Body.Close()
-                       if resp.StatusCode != http.StatusOK {
-                               return fmt.Errorf("unexpected status code: %d", 
resp.StatusCode)
-                       }
-                       return nil
+                       return doHTTPCheck(httpClient, healthCheckURL, 
testUser.Username, testUser.Password+"wrong", true)
+               }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+       })
+
+       g.It("queries auth config reload with both gRPC and HTTP", func() {
+               // query with original username and password
+               groupListURL := 
fmt.Sprintf("http://%s/api/v1/group/schema/lists";, httpAddr)
+               gm.Eventually(func() error {
+                       return doHTTPCheck(httpClient, groupListURL, 
testUser.Username, testUser.Password, true)
+               }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+               // 1. Generate a new username/password and modify the original 
auth configuration file
+               newUser := serverAuth.User{Username: "updated-user", Password: 
"updated-pass"}
+               cfgBytes, err := os.ReadFile(authCfgFile)
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+               var cfg serverAuth.Config
+               err = yaml.Unmarshal(cfgBytes, &cfg)
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+               cfg.Users = []serverAuth.User{newUser}
+               newCfgBytes, err := yaml.Marshal(cfg)
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+               err = os.WriteFile(authCfgFile, newCfgBytes, 0o600)
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+
+               // 2. Waiting for server to reload configuration
+               time.Sleep(2 * time.Second)
+
+               // 3. New gRPC connections use the updated username/password
+               newConn, err := grpchelper.ConnWithAuth(grpcAddr, 
10*time.Second,
+                       newUser.Username, newUser.Password,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+               defer newConn.Close()
+               verifyGRPCQuery(newConn, baseTime, newUser.Username, 
newUser.Password, true)
+
+               // 4. Verify that old username/password cannot access gRPC
+               _, err = grpchelper.ConnWithAuth(grpcAddr, 10*time.Second,
+                       testUser.Username, testUser.Password,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(err).To(gm.HaveOccurred())
+
+               // 5. New HTTP clients use updated username/password
+               groupListURL = 
fmt.Sprintf("http://%s/api/v1/group/schema/lists";, httpAddr)
+               gm.Eventually(func() error {
+                       return doHTTPCheck(httpClient, groupListURL, 
newUser.Username, newUser.Password, true)
+               }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+               // 6. Verification of old HTTP username/password access failed
+               gm.Eventually(func() error {
+                       return doHTTPCheck(httpClient, groupListURL, 
testUser.Username, testUser.Password, true)
                }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
        })
 })
+
+//nolint:unparam
+func doHTTPCheck(client *http.Client, url, username, password string, expectOK 
bool) error {
+       req, err := http.NewRequest(http.MethodGet, url, nil)
+       if err != nil {
+               return err
+       }
+       req.Header.Add("Authorization", auth.GenerateBasicAuthHeader(username, 
password))
+       resp, err := client.Do(req)
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       if expectOK && resp.StatusCode != http.StatusOK {
+               return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+       }
+       if !expectOK && resp.StatusCode == http.StatusOK {
+               return fmt.Errorf("expected failure but got %d", 
resp.StatusCode)
+       }
+       return nil
+}
+
+func verifyGRPCQuery(conn *grpclib.ClientConn, baseTime time.Time, username, 
password string, expectOK bool) {
+       checkFn := func(innerGm gm.Gomega) {
+               casesMeasureData.VerifyFnWithAuth(innerGm, 
helpers.SharedContext{
+                       Connection: conn,
+                       BaseTime:   baseTime,
+               }, helpers.Args{Input: "all", Duration: 25 * time.Minute, 
Offset: -20 * time.Minute},
+                       username, password)
+       }
+       if expectOK {
+               gm.Eventually(checkFn, 
flags.EventuallyTimeout).Should(gm.Succeed())
+       } else {
+               gm.Eventually(checkFn, 
flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+       }
+}
+
+func checkGRPCHealth(grpcAddr string, username, password string, opts 
[]grpclib.DialOption, expectOK bool) {
+       checkFn := helpers.HealthCheckWithAuth(grpcAddr, 10*time.Second, 
10*time.Second, username, password, opts...)
+       if expectOK {
+               gm.Eventually(checkFn, 
flags.EventuallyTimeout).Should(gm.Succeed())
+       } else {
+               gm.Eventually(checkFn, 
flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+       }
+}


Reply via email to