This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b1d735ce implement dynamic reload of credential files (#734)
b1d735ce is described below
commit b1d735ceebc6f0477a8fa4b03947ab9acb7bb2ce
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Aug 26 13:17:12 2025 +0800
implement dynamic reload of credential files (#734)
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
banyand/liaison/grpc/auth.go | 15 +-
banyand/liaison/grpc/server.go | 34 ++-
banyand/liaison/http/auth.go | 10 +-
banyand/liaison/http/server.go | 12 +-
banyand/liaison/pkg/auth/config.go | 105 ---------
banyand/liaison/pkg/auth/reloader.go | 302 +++++++++++++++++++++++++
banyand/liaison/pkg/auth/reloader_test.go | 122 ++++++++++
pkg/cmdsetup/liaison.go | 2 +-
pkg/cmdsetup/standalone.go | 2 +-
test/integration/standalone/other/auth_test.go | 189 +++++++++-------
10 files changed, 574 insertions(+), 219 deletions(-)
diff --git a/banyand/liaison/grpc/auth.go b/banyand/liaison/grpc/auth.go
index 52ad21a5..8dc88265 100644
--- a/banyand/liaison/grpc/auth.go
+++ b/banyand/liaison/grpc/auth.go
@@ -28,47 +28,49 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
)
-func authInterceptor(cfg *auth.Config) grpc.UnaryServerInterceptor {
+func authInterceptor(authReloader *auth.Reloader) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
+ cfg := authReloader.GetConfig()
if !cfg.Enabled {
return handler(ctx, req)
}
if info.FullMethod == "/grpc.health.v1.Health/Check" &&
!cfg.HealthAuthEnabled {
return handler(ctx, req)
}
- if err := validateUser(ctx, cfg); err != nil {
+ if err := validateUser(ctx, authReloader); err != nil {
return nil, err
}
return handler(ctx, req)
}
}
-func authStreamInterceptor(cfg *auth.Config) grpc.StreamServerInterceptor {
+func authStreamInterceptor(authReloader *auth.Reloader)
grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
+ cfg := authReloader.GetConfig()
if !cfg.Enabled {
return handler(srv, stream)
}
if info.FullMethod == "/grpc.health.v1.Health/Check" &&
!cfg.HealthAuthEnabled {
return handler(srv, stream)
}
- if err := validateUser(stream.Context(), cfg); err != nil {
+ if err := validateUser(stream.Context(), authReloader); err !=
nil {
return err
}
return handler(srv, stream)
}
}
-func validateUser(ctx context.Context, cfg *auth.Config) error {
+func validateUser(ctx context.Context, authReloader *auth.Reloader) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.Unauthenticated, "metadata is not
provided")
@@ -84,9 +86,8 @@ func validateUser(ctx context.Context, cfg *auth.Config)
error {
username := usernames[0]
password := passwords[0]
- if !auth.CheckUsernameAndPassword(cfg, username, password) {
+ if !authReloader.CheckUsernameAndPassword(username, password) {
return status.Errorf(codes.Unauthenticated, "Invalid
credentials")
}
-
return nil
}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6a55dc65..fae1180b 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -66,7 +66,7 @@ var (
// Server defines the gRPC server.
type Server interface {
run.Unit
- GetAuthCfg() *auth.Config
+ GetAuthReloader() *auth.Reloader
GetPort() *uint32
}
@@ -96,12 +96,12 @@ type server struct {
*propertyServer
*indexRuleBindingRegistryServer
*traceRegistryServer
+ authReloader *auth.Reloader
groupRepo *groupRepo
metrics *metrics
certFile string
keyFile string
authConfigFile string
- cfg *auth.Config
host string
addr string
accessLogRootPath string
@@ -110,6 +110,7 @@ type server struct {
port uint32
enableIngestionAccessLog bool
tls bool
+ healthAuthEnabled bool
}
// NewServer returns a new gRPC server.
@@ -164,8 +165,8 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
traceRegistryServer: &traceRegistryServer{
schemaRegistry: schemaRegistry,
},
- schemaRepo: schemaRegistry,
- cfg: auth.InitCfg(),
+ schemaRepo: schemaRegistry,
+ authReloader: auth.InitAuthReloader(),
}
s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC}
@@ -190,7 +191,7 @@ func (s *server) PreRun(_ context.Context) error {
}
}
if s.authConfigFile != "" {
- if err := auth.LoadConfig(s.cfg, s.authConfigFile); err != nil {
+ if err := s.authReloader.ConfigAuthReloader(s.authConfigFile,
s.healthAuthEnabled, s.log); err != nil {
return err
}
}
@@ -239,9 +240,9 @@ func (s *server) GetPort() *uint32 {
return &s.port
}
-// GetAuthCfg returns auth cfg (for httpserver).
-func (s *server) GetAuthCfg() *auth.Config {
- return s.cfg
+// GetAuthReloader returns auth reloader (for httpserver).
+func (s *server) GetAuthReloader() *auth.Reloader {
+ return s.authReloader
}
func (s *server) FlagSet() *run.FlagSet {
@@ -252,7 +253,7 @@ func (s *server) FlagSet() *run.FlagSet {
fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file")
fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file")
fs.StringVar(&s.authConfigFile, "auth-config-file", "", "Path to the
authentication config file (YAML format)")
- fs.BoolVar(&s.cfg.HealthAuthEnabled, "enable-health-auth", false,
"enable authentication for health check")
+ fs.BoolVar(&s.healthAuthEnabled, "enable-health-auth", false, "enable
authentication for health check")
fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens")
fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log",
false, "enable ingestion access log")
@@ -300,6 +301,14 @@ func (s *server) Serve() run.StopNotify {
creds := credentials.NewTLS(tlsConfig)
opts = append(opts, grpclib.Creds(creds))
}
+ if s.authConfigFile != "" {
+ if err := s.authReloader.Start(); err != nil {
+ s.log.Error().Err(err).Msg("Failed to start
authReloader for gRPC")
+ close(s.stopCh)
+ return s.stopCh
+ }
+ s.log.Info().Str("authConfigFile",
s.authConfigFile).Msg("Starting auth config file monitoring")
+ }
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack",
string(debug.Stack())).Msg("recovered from panic")
s.metrics.totalPanic.Inc(1)
@@ -315,8 +324,8 @@ func (s *server) Serve() run.StopNotify {
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
if s.authConfigFile != "" {
- streamChain = append(streamChain, authStreamInterceptor(s.cfg))
- unaryChain = append(unaryChain, authInterceptor(s.cfg))
+ streamChain = append(streamChain,
authStreamInterceptor(s.authReloader))
+ unaryChain = append(unaryChain, authInterceptor(s.authReloader))
}
opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
@@ -365,6 +374,9 @@ func (s *server) GracefulStop() {
if s.tls && s.tlsReloader != nil {
s.tlsReloader.Stop()
}
+ if s.authConfigFile != "" && s.authReloader != nil {
+ s.authReloader.Stop()
+ }
stopped := make(chan struct{})
go func() {
s.ser.GracefulStop()
diff --git a/banyand/liaison/http/auth.go b/banyand/liaison/http/auth.go
index 716e73f4..141b5846 100644
--- a/banyand/liaison/http/auth.go
+++ b/banyand/liaison/http/auth.go
@@ -29,14 +29,14 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
)
-func authMiddleware(cfg *auth.Config) func(http.Handler) http.Handler {
+func authMiddleware(authReloader *auth.Reloader) func(http.Handler)
http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r
*http.Request) {
if isStaticPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
-
+ cfg := authReloader.GetConfig()
if !cfg.Enabled {
next.ServeHTTP(w, r)
return
@@ -77,7 +77,7 @@ func authMiddleware(cfg *auth.Config) func(http.Handler)
http.Handler {
username := parts[0]
password := parts[1]
- if !auth.CheckUsernameAndPassword(cfg, username,
password) {
+ if !authReloader.CheckUsernameAndPassword(username,
password) {
http.Error(w, `{"error": "invalid
credentials"}`, http.StatusUnauthorized)
return
}
@@ -105,9 +105,9 @@ func isStaticPath(path string) bool {
return false
}
-func buildGRPCContextForHealthCheck(cfg *auth.Config, r *http.Request)
(context.Context, error) {
+func buildGRPCContextForHealthCheck(authReloader *auth.Reloader, r
*http.Request) (context.Context, error) {
ctx := r.Context()
-
+ cfg := authReloader.GetConfig()
if cfg.HealthAuthEnabled {
username := r.Header.Get("Grpc-Metadata-Username")
password := r.Header.Get("Grpc-Metadata-Password")
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index b0a88241..20bac794 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -60,10 +60,10 @@ var (
)
// NewServer return a http service.
-func NewServer(cfg *auth.Config) Server {
+func NewServer(authReloader *auth.Reloader) Server {
return &server{
- stopCh: make(chan struct{}),
- cfg: cfg,
+ stopCh: make(chan struct{}),
+ authReloader: authReloader,
}
}
@@ -90,7 +90,7 @@ type server struct {
grpcAddr string
keyFile string
certFile string
- cfg *auth.Config
+ authReloader *auth.Reloader
grpcCert string
grpcMu sync.Mutex
port uint32
@@ -368,9 +368,9 @@ func (p *server) initGRPCClient() error {
// This avoids the conflict when remounting to /api path
newMux := chi.NewRouter()
- newMux.Use(authMiddleware(p.cfg))
+ newMux.Use(authMiddleware(p.authReloader))
newMux.Handle("/api/healthz", http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
- ctx, err := buildGRPCContextForHealthCheck(p.cfg, r)
+ ctx, err := buildGRPCContextForHealthCheck(p.authReloader, r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
diff --git a/banyand/liaison/pkg/auth/config.go
b/banyand/liaison/pkg/auth/config.go
deleted file mode 100644
index f595a93d..00000000
--- a/banyand/liaison/pkg/auth/config.go
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package auth provides configuration management and validation logic for
authentication.
-package auth
-
-import (
- "crypto/subtle"
- "fmt"
- "os"
- "strings"
-
- "sigs.k8s.io/yaml"
-)
-
-// Config AuthConfig.
-type Config struct {
- Users []User `yaml:"users"`
- Enabled bool `yaml:"-"`
- HealthAuthEnabled bool `yaml:"-"`
-}
-
-// User details from config file.
-type User struct {
- Username string `yaml:"username"`
- Password string `yaml:"password"`
-}
-
-// InitCfg returns Config with default values.
-func InitCfg() *Config {
- return &Config{
- Enabled: false,
- HealthAuthEnabled: false,
- Users: []User{},
- }
-}
-
-// LoadConfig implements the reading of the authentication configuration.
-func LoadConfig(cfg *Config, filePath string) error {
- cfg.Enabled = true
-
- info, err := os.Stat(filePath)
- if err != nil {
- return err
- }
- perm := info.Mode().Perm()
- if perm != 0o600 {
- return fmt.Errorf("config file %s has unsafe permissions: %o
(expected 0600)", filePath, perm)
- }
-
- data, err := os.ReadFile(filePath)
- if err != nil {
- return err
- }
- err = yaml.Unmarshal(data, cfg)
- if err != nil {
- return err
- }
- return nil
-}
-
-// CheckUsernameAndPassword returns true if the provided username and password
match any configured user.
-func CheckUsernameAndPassword(cfg *Config, username, password string) bool {
- username = strings.TrimSpace(username)
- password = strings.TrimSpace(password)
-
- for _, user := range cfg.Users {
- storedUsername := strings.TrimSpace(user.Username)
- storedPassword := strings.TrimSpace(user.Password)
-
- // Convert to []byte
- usernameBytes := []byte(username)
- storedUsernameBytes := []byte(storedUsername)
- passwordBytes := []byte(password)
- storedPasswordBytes := []byte(storedPassword)
-
- // Length must match
- if len(usernameBytes) != len(storedUsernameBytes) ||
len(passwordBytes) != len(storedPasswordBytes) {
- continue
- }
-
- // Use constant-time comparison
- usernameMatch := subtle.ConstantTimeCompare(usernameBytes,
storedUsernameBytes) == 1
- passwordMatch := subtle.ConstantTimeCompare(passwordBytes,
storedPasswordBytes) == 1
-
- if usernameMatch && passwordMatch {
- return true
- }
- }
- return false
-}
diff --git a/banyand/liaison/pkg/auth/reloader.go
b/banyand/liaison/pkg/auth/reloader.go
new file mode 100644
index 00000000..c593e309
--- /dev/null
+++ b/banyand/liaison/pkg/auth/reloader.go
@@ -0,0 +1,302 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package auth provides configuration management and validation logic for
authentication.
+package auth
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "crypto/subtle"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/fsnotify/fsnotify"
+ "github.com/pkg/errors"
+ "sigs.k8s.io/yaml"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// Config AuthConfig.
+type Config struct {
+ Users []User `yaml:"users"`
+ Enabled bool `yaml:"-"`
+ HealthAuthEnabled bool `yaml:"-"`
+}
+
+// User details from config file.
+type User struct {
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+}
+
+// InitCfg returns Config with default values.
+func InitCfg() *Config {
+ return &Config{
+ Enabled: false,
+ HealthAuthEnabled: false,
+ Users: []User{},
+ }
+}
+
+// loadConfig implements the reading of the authentication configuration.
+func (ar *Reloader) loadConfig(filePath string) error {
+ if filePath == "" {
+ return errors.New("configFile must be provided")
+ }
+ info, err := os.Stat(filePath)
+ if err != nil {
+ return err
+ }
+ perm := info.Mode().Perm()
+ if perm != 0o600 {
+ return fmt.Errorf("config file %s has unsafe permissions: %o
(expected 0600)", filePath, perm)
+ }
+
+ data, err := os.ReadFile(filePath)
+ if err != nil {
+ return err
+ }
+ newCfg := InitCfg()
+ err = yaml.Unmarshal(data, newCfg)
+ if err != nil {
+ return err
+ }
+ ar.setAuthEnabled(true)
+ ar.setUsers(newCfg.Users)
+ return nil
+}
+
+// Reloader manages dynamic reloading of auth config.
+type Reloader struct {
+ debounceTimer *time.Timer
+ updateCh chan struct{}
+ configFile string
+ Config *Config
+ watcher *fsnotify.Watcher
+ log *logger.Logger
+ lastConfigHash []byte
+ mu sync.RWMutex
+}
+
+// InitAuthReloader returns Reloader with default values.
+func InitAuthReloader() *Reloader {
+ return &Reloader{
+ Config: InitCfg(),
+ }
+}
+
+// ConfigAuthReloader returns a Reloader instance with properties populated.
+func (ar *Reloader) ConfigAuthReloader(configFile string, healthAuthEnabled
bool, log *logger.Logger) error {
+ if configFile == "" {
+ return errors.New("configFile must be provided")
+ }
+ if log == nil {
+ return errors.New("logger must not be nil")
+ }
+ err := ar.loadConfig(configFile)
+ if err != nil {
+ return errors.Wrapf(err, "failed to load initial auth config
from %s", configFile)
+ }
+ cfg := ar.GetConfig()
+ ar.setHealthAuthEnabled(healthAuthEnabled)
+
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return errors.Wrap(err, "failed to create fsnotify watcher")
+ }
+
+ ar.setConfig(cfg)
+ ar.configFile = configFile
+ ar.log = log
+ ar.watcher = watcher
+ ar.updateCh = make(chan struct{}, 1)
+ ar.lastConfigHash, _ = ar.computeFileHash(configFile)
+
+ return nil
+}
+
+// Start begins monitoring the config file.
+func (ar *Reloader) Start() error {
+ if err := ar.watcher.Add(ar.configFile); err != nil {
+ return errors.Wrapf(err, "failed to watch config file: %s",
ar.configFile)
+ }
+
+ go ar.watchFiles()
+ return nil
+}
+
+// Stop stops the watcher.
+func (ar *Reloader) Stop() {
+ _ = ar.watcher.Close()
+}
+
+// GetConfig returns the current config (safe for concurrent use).
+func (ar *Reloader) GetConfig() *Config {
+ ar.mu.RLock()
+ defer ar.mu.RUnlock()
+ return ar.Config
+}
+
+func (ar *Reloader) setConfig(cfg *Config) {
+ ar.mu.Lock()
+ defer ar.mu.Unlock()
+ ar.Config = cfg
+}
+
+func (ar *Reloader) setHealthAuthEnabled(enabled bool) {
+ ar.mu.Lock()
+ defer ar.mu.Unlock()
+ ar.Config.HealthAuthEnabled = enabled
+}
+
+func (ar *Reloader) setAuthEnabled(enabled bool) {
+ ar.mu.Lock()
+ defer ar.mu.Unlock()
+ ar.Config.Enabled = enabled
+}
+
+func (ar *Reloader) setUsers(users []User) {
+ ar.mu.Lock()
+ defer ar.mu.Unlock()
+ ar.Config.Users = users
+}
+
+// CheckUsernameAndPassword returns true if the provided username and password
match any configured user.
+func (ar *Reloader) CheckUsernameAndPassword(username, password string) bool {
+ cfg := ar.GetConfig()
+
+ username = strings.TrimSpace(username)
+ password = strings.TrimSpace(password)
+
+ for _, user := range cfg.Users {
+ storedUsername := strings.TrimSpace(user.Username)
+ storedPassword := strings.TrimSpace(user.Password)
+
+ // Convert to []byte
+ usernameBytes := []byte(username)
+ storedUsernameBytes := []byte(storedUsername)
+ passwordBytes := []byte(password)
+ storedPasswordBytes := []byte(storedPassword)
+
+ // Length must match
+ if len(usernameBytes) != len(storedUsernameBytes) ||
len(passwordBytes) != len(storedPasswordBytes) {
+ continue
+ }
+
+ // Use constant-time comparison
+ usernameMatch := subtle.ConstantTimeCompare(usernameBytes,
storedUsernameBytes) == 1
+ passwordMatch := subtle.ConstantTimeCompare(passwordBytes,
storedPasswordBytes) == 1
+
+ if usernameMatch && passwordMatch {
+ return true
+ }
+ }
+ return false
+}
+
+// watchFiles listens for config changes.
+func (ar *Reloader) watchFiles() {
+ for {
+ if ar.watcher == nil {
+ ar.log.Error().Msg("watcher is nil, exiting watchFiles")
+ return
+ }
+ select {
+ case event, ok := <-ar.watcher.Events:
+ if !ok {
+ return
+ }
+ ar.log.Debug().Str("file", event.Name).Str("op",
event.Op.String()).Msg("Detected auth file event")
+ if
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) != 0 {
+ ar.scheduleReloadAttempt()
+ }
+ case err, ok := <-ar.watcher.Errors:
+ if !ok {
+ return
+ }
+ ar.log.Error().Err(err).Msg("watcher error")
+ }
+ }
+}
+
+// scheduleReloadAttempt debounces reload attempts.
+func (ar *Reloader) scheduleReloadAttempt() {
+ if ar.debounceTimer == nil {
+ ar.debounceTimer = time.AfterFunc(500*time.Millisecond,
ar.tryReload)
+ } else {
+ ar.debounceTimer.Reset(500 * time.Millisecond)
+ }
+}
+
+// tryReload reloads config if changed.
+func (ar *Reloader) tryReload() {
+ changed, newHash, err := ar.checkContentChanged()
+ if err != nil {
+ ar.log.Error().Err(err).Msg("error checking config change")
+ return
+ }
+ if !changed {
+ return
+ }
+
+ err = ar.loadConfig(ar.configFile)
+ if err != nil {
+ ar.log.Error().Err(err).Msg("failed to reload config")
+ return
+ }
+
+ ar.mu.Lock()
+ ar.lastConfigHash = newHash
+ ar.mu.Unlock()
+
+ // notify
+ select {
+ case ar.updateCh <- struct{}{}:
+ default:
+ }
+ ar.log.Info().Msg("auth config updated in memory")
+}
+
+// checkContentChanged compares file hash.
+func (ar *Reloader) checkContentChanged() (bool, []byte, error) {
+ currentHash, err := ar.computeFileHash(ar.configFile)
+ if err != nil {
+ return false, nil, err
+ }
+ return !bytes.Equal(ar.lastConfigHash, currentHash), currentHash, nil
+}
+
+// computeFileHash computes sha256 of file.
+func (ar *Reloader) computeFileHash(filePath string) ([]byte, error) {
+ content, err := os.ReadFile(filePath)
+ if err != nil {
+ return nil, err
+ }
+ h := sha256.New()
+ h.Write(content)
+ return h.Sum(nil), nil
+}
+
+// GetUpdateChannel allows external consumers to watch for updates.
+func (ar *Reloader) GetUpdateChannel() <-chan struct{} {
+ return ar.updateCh
+}
diff --git a/banyand/liaison/pkg/auth/reloader_test.go
b/banyand/liaison/pkg/auth/reloader_test.go
new file mode 100644
index 00000000..176ead04
--- /dev/null
+++ b/banyand/liaison/pkg/auth/reloader_test.go
@@ -0,0 +1,122 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func writeConfigFile(t *testing.T, dir, filename, content string) string {
+ t.Helper()
+ path := filepath.Join(dir, filename)
+ err := os.WriteFile(path, []byte(content), 0o600)
+ if err != nil {
+ t.Fatalf("failed to write config file: %v", err)
+ }
+ return path
+}
+
+func TestLoadConfigAndAuthCheck(t *testing.T) {
+ dir := t.TempDir()
+ configYAML := `
+users:
+ - username: "alice"
+ password: "secret"
+ - username: "bob"
+ password: "hunter2"
+`
+ path := writeConfigFile(t, dir, "auth.yaml", configYAML)
+ // init reloader
+ ar := InitAuthReloader()
+ err := ar.loadConfig(path)
+ if err != nil {
+ t.Fatalf("expected loadConfig success, got error: %v", err)
+ }
+ cfg := ar.GetConfig()
+ if len(cfg.Users) != 2 {
+ t.Fatalf("expected 2 users, got %d", len(cfg.Users))
+ }
+ log := logger.GetLogger("auth-test")
+ if err := ar.ConfigAuthReloader(path, false, log); err != nil {
+ t.Fatalf("ConfigAuthReloader failed: %v", err)
+ }
+
+ if !ar.CheckUsernameAndPassword("alice", "secret") {
+ t.Errorf("expected alice/secret to be valid")
+ }
+ if ar.CheckUsernameAndPassword("alice", "wrong") {
+ t.Errorf("expected alice/wrong to be invalid")
+ }
+ if ar.CheckUsernameAndPassword("notexist", "secret") {
+ t.Errorf("expected non-existent user to fail")
+ }
+}
+
+func TestReloaderUpdatesOnFileChange(t *testing.T) {
+ dir := t.TempDir()
+ initialYAML := `
+users:
+ - username: "alice"
+ password: "secret"
+`
+ path := writeConfigFile(t, dir, "auth.yaml", initialYAML)
+
+ ar := InitAuthReloader()
+ log := logger.GetLogger("auth-test")
+ if err := ar.ConfigAuthReloader(path, false, log); err != nil {
+ t.Fatalf("ConfigAuthReloader failed: %v", err)
+ }
+
+ if err := ar.Start(); err != nil {
+ t.Fatalf("failed to start reloader: %v", err)
+ }
+ defer ar.Stop()
+
+ if !ar.CheckUsernameAndPassword("alice", "secret") {
+ t.Fatalf("expected alice/secret to be valid before update")
+ }
+
+ updatedYAML := `
+users:
+ - username: "bob"
+ password: "hunter2"
+`
+ err := os.WriteFile(path, []byte(updatedYAML), 0o600)
+ if err != nil {
+ t.Fatalf("failed to update config file: %v", err)
+ }
+
+ select {
+ case <-ar.GetUpdateChannel():
+ // ok
+ case <-time.After(2 * time.Second):
+ t.Fatalf("timed out waiting for update channel notification")
+ }
+
+ if ar.CheckUsernameAndPassword("alice", "secret") {
+ t.Errorf("alice should no longer be valid after update")
+ }
+ if !ar.CheckUsernameAndPassword("bob", "hunter2") {
+ t.Errorf("expected bob/hunter2 to be valid after update")
+ }
+}
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 97f3f091..5c42365e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -81,7 +81,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
PropertyNodeRegistry:
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client,
propertyNodeSel),
}, metricSvc)
profSvc := observability.NewProfService()
- httpServer := http.NewServer(grpcServer.GetAuthCfg())
+ httpServer := http.NewServer(grpcServer.GetAuthReloader())
var units []run.Unit
units = append(units, runners...)
units = append(units,
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 9bc005aa..90b9293e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -76,7 +76,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
PropertyNodeRegistry: nr,
}, metricSvc)
profSvc := observability.NewProfService()
- httpServer := http.NewServer(grpcServer.GetAuthCfg())
+ httpServer := http.NewServer(grpcServer.GetAuthReloader())
var units []run.Unit
units = append(units, runners...)
diff --git a/test/integration/standalone/other/auth_test.go
b/test/integration/standalone/other/auth_test.go
index 20cc06bc..7ce4eef0 100644
--- a/test/integration/standalone/other/auth_test.go
+++ b/test/integration/standalone/other/auth_test.go
@@ -54,6 +54,8 @@ var _ = g.Describe("Query service_cpm_minute with
authentication", func() {
var goods []gleak.Goroutine
var grpcAddr, httpAddr string
var testUser serverAuth.User
+ var authCfgFile string
+ var httpClient *http.Client
g.BeforeEach(func() {
// load server config.yaml
@@ -62,7 +64,7 @@ var _ = g.Describe("Query service_cpm_minute with
authentication", func() {
tempServerCfg := filepath.Join(os.TempDir(),
fmt.Sprintf(".bydb-%s.yaml", uuid.New().String()))
err = os.WriteFile(tempServerCfg, cfgBytes, 0o600)
gm.Expect(err).NotTo(gm.HaveOccurred())
- authCfgFile := tempServerCfg
+ authCfgFile = tempServerCfg
info, _ := os.Stat(authCfgFile)
gm.Expect(info.Mode().Perm()).To(gm.Equal(os.FileMode(0o600)))
@@ -85,6 +87,7 @@ var _ = g.Describe("Query service_cpm_minute with
authentication", func() {
baseTime = time.Unix(0, ns-ns%int64(time.Minute))
interval = 500 * time.Millisecond
casesMeasureData.WriteWithAuth(conn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", baseTime, interval,
testUser.Username, testUser.Password)
+ httpClient = &http.Client{}
goods = gleak.Goroutines()
})
@@ -95,116 +98,136 @@ var _ = g.Describe("Query service_cpm_minute with
authentication", func() {
})
g.It("grpc query and healthcheck with correct username and password",
func() {
- gm.Eventually(func(innerGm gm.Gomega) {
- casesMeasureData.VerifyFnWithAuth(innerGm,
helpers.SharedContext{
- Connection: conn,
- BaseTime: baseTime,
- },
- helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute},
- testUser.Username, testUser.Password)
- }, flags.EventuallyTimeout).Should(gm.Succeed())
-
+ verifyGRPCQuery(conn, baseTime, testUser.Username,
testUser.Password, true)
opts := make([]grpclib.DialOption, 0, 1)
opts, err := grpchelper.SecureOptions(opts, false, true, "")
gm.Expect(err).ToNot(gm.HaveOccurred())
- gm.Eventually(func() error {
- return helpers.HealthCheckWithAuth(grpcAddr,
10*time.Second, 10*time.Second, testUser.Username, testUser.Password, opts...)()
- }, flags.EventuallyTimeout).Should(gm.Succeed())
+ checkGRPCHealth(grpcAddr, testUser.Username, testUser.Password,
opts, true)
})
g.It("grpc query and healthcheck with wrong username and password",
func() {
- gm.Eventually(func(innerGm gm.Gomega) {
- casesMeasureData.VerifyFnWithAuth(innerGm,
helpers.SharedContext{
- Connection: conn,
- BaseTime: baseTime,
- },
- helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute},
- testUser.Username, testUser.Password+"wrong")
- }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
-
+ verifyGRPCQuery(conn, baseTime, testUser.Username,
testUser.Password+"wrong", false)
opts := make([]grpclib.DialOption, 0, 1)
opts, err := grpchelper.SecureOptions(opts, false, true, "")
gm.Expect(err).ToNot(gm.HaveOccurred())
- gm.Eventually(func() error {
- return helpers.HealthCheckWithAuth(grpcAddr,
10*time.Second, 10*time.Second, testUser.Username, testUser.Password+"wrong",
opts...)()
- }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+ checkGRPCHealth(grpcAddr, testUser.Username,
testUser.Password+"wrong", opts, false)
})
g.It("http query and healthcheck with correct username and password",
func() {
- httpClient := &http.Client{}
groupListURL :=
fmt.Sprintf("http://%s/api/v1/group/schema/lists", httpAddr)
gm.Eventually(func() error {
- req, err := http.NewRequest(http.MethodGet,
groupListURL, nil)
- if err != nil {
- return err
- }
- req.Header.Add("Authorization",
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password))
- resp, respErr := httpClient.Do(req)
- if respErr != nil {
- return respErr
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("unexpected status code: %d",
resp.StatusCode)
- }
- return nil
+ return doHTTPCheck(httpClient, groupListURL,
testUser.Username, testUser.Password, true)
}, flags.EventuallyTimeout).Should(gm.Succeed())
healthCheckURL := fmt.Sprintf("http://%s/api/healthz", httpAddr)
gm.Eventually(func() error {
- req, err := http.NewRequest(http.MethodGet,
healthCheckURL, nil)
- if err != nil {
- return err
- }
- req.Header.Add("Authorization",
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password))
- resp, respErr := httpClient.Do(req)
- if respErr != nil {
- return respErr
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("unexpected status code: %d",
resp.StatusCode)
- }
- return nil
+ return doHTTPCheck(httpClient, healthCheckURL,
testUser.Username, testUser.Password, true)
}, flags.EventuallyTimeout).Should(gm.Succeed())
})
g.It("http query and healthcheck with wrong username and password",
func() {
- httpClient := &http.Client{}
groupListURL :=
fmt.Sprintf("http://%s/api/v1/group/schema/lists", httpAddr)
gm.Eventually(func() error {
- req, err := http.NewRequest(http.MethodGet,
groupListURL, nil)
- if err != nil {
- return err
- }
- req.Header.Add("Authorization",
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password+"wrong"))
- resp, respErr := httpClient.Do(req)
- if respErr != nil {
- return respErr
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("unexpected status code: %d",
resp.StatusCode)
- }
- return nil
+ return doHTTPCheck(httpClient, groupListURL,
testUser.Username, testUser.Password+"wrong", true)
}, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
healthCheckURL := fmt.Sprintf("http://%s/api/healthz", httpAddr)
gm.Eventually(func() error {
- req, err := http.NewRequest(http.MethodGet,
healthCheckURL, nil)
- if err != nil {
- return err
- }
- req.Header.Add("Authorization",
auth.GenerateBasicAuthHeader(testUser.Username, testUser.Password+"wrong"))
- resp, respErr := httpClient.Do(req)
- if respErr != nil {
- return respErr
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("unexpected status code: %d",
resp.StatusCode)
- }
- return nil
+ return doHTTPCheck(httpClient, healthCheckURL,
testUser.Username, testUser.Password+"wrong", true)
+ }, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+ })
+
+ g.It("queries auth config reload with both gRPC and HTTP", func() {
+ // query with original username and password
+ groupListURL :=
fmt.Sprintf("http://%s/api/v1/group/schema/lists", httpAddr)
+ gm.Eventually(func() error {
+ return doHTTPCheck(httpClient, groupListURL,
testUser.Username, testUser.Password, true)
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ // 1. Generate a new username/password and modify the original
auth configuration file
+ newUser := serverAuth.User{Username: "updated-user", Password:
"updated-pass"}
+ cfgBytes, err := os.ReadFile(authCfgFile)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ var cfg serverAuth.Config
+ err = yaml.Unmarshal(cfgBytes, &cfg)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ cfg.Users = []serverAuth.User{newUser}
+ newCfgBytes, err := yaml.Marshal(cfg)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ err = os.WriteFile(authCfgFile, newCfgBytes, 0o600)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ // 2. Waiting for server to reload configuration
+ time.Sleep(2 * time.Second)
+
+ // 3. New gRPC connections use the updated username/password
+ newConn, err := grpchelper.ConnWithAuth(grpcAddr,
10*time.Second,
+ newUser.Username, newUser.Password,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ defer newConn.Close()
+ verifyGRPCQuery(newConn, baseTime, newUser.Username,
newUser.Password, true)
+
+ // 4. Verify that old username/password cannot access gRPC
+ _, err = grpchelper.ConnWithAuth(grpcAddr, 10*time.Second,
+ testUser.Username, testUser.Password,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+ gm.Expect(err).To(gm.HaveOccurred())
+
+ // 5. New HTTP clients use updated username/password
+ groupListURL =
fmt.Sprintf("http://%s/api/v1/group/schema/lists", httpAddr)
+ gm.Eventually(func() error {
+ return doHTTPCheck(httpClient, groupListURL,
newUser.Username, newUser.Password, true)
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+
+ // 6. Verification of old HTTP username/password access failed
+ gm.Eventually(func() error {
+ return doHTTPCheck(httpClient, groupListURL,
testUser.Username, testUser.Password, true)
}, flags.EventuallyTimeout).ShouldNot(gm.Succeed())
})
})
+
+//nolint:unparam
+func doHTTPCheck(client *http.Client, url, username, password string, expectOK
bool) error {
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Add("Authorization", auth.GenerateBasicAuthHeader(username,
password))
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if expectOK && resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+ }
+ if !expectOK && resp.StatusCode == http.StatusOK {
+ return fmt.Errorf("expected failure but got %d",
resp.StatusCode)
+ }
+ return nil
+}
+
+func verifyGRPCQuery(conn *grpclib.ClientConn, baseTime time.Time, username,
password string, expectOK bool) {
+ checkFn := func(innerGm gm.Gomega) {
+ casesMeasureData.VerifyFnWithAuth(innerGm,
helpers.SharedContext{
+ Connection: conn,
+ BaseTime: baseTime,
+ }, helpers.Args{Input: "all", Duration: 25 * time.Minute,
Offset: -20 * time.Minute},
+ username, password)
+ }
+ if expectOK {
+ gm.Eventually(checkFn,
flags.EventuallyTimeout).Should(gm.Succeed())
+ } else {
+ gm.Eventually(checkFn,
flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+ }
+}
+
+func checkGRPCHealth(grpcAddr string, username, password string, opts
[]grpclib.DialOption, expectOK bool) {
+ checkFn := helpers.HealthCheckWithAuth(grpcAddr, 10*time.Second,
10*time.Second, username, password, opts...)
+ if expectOK {
+ gm.Eventually(checkFn,
flags.EventuallyTimeout).Should(gm.Succeed())
+ } else {
+ gm.Eventually(checkFn,
flags.EventuallyTimeout).ShouldNot(gm.Succeed())
+ }
+}