This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 62b418e5 Add new gRPC and HTTP service for fetching all nodes has
discovery in each liaison/data node (#927)
62b418e5 is described below
commit 62b418e564a6146e9a56ad61d4126fe59b398373
Author: mrproliu <[email protected]>
AuthorDate: Sat Jan 10 11:36:04 2026 +0800
Add new gRPC and HTTP service for fetching all nodes has discovery in each
liaison/data node (#927)
* Add new gRPC and HTTP service for fetching all nodes has discovery in
each liaison/data node
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
api/proto/banyandb/database/v1/rpc.proto | 26 +++
banyand/backup/lifecycle/cluster_state.go | 127 +++++++++++
banyand/backup/lifecycle/service.go | 251 ++++++++++++++++++++-
banyand/backup/lifecycle/steps.go | 9 +-
banyand/liaison/grpc/registry_test.go | 2 +-
.../node.go => liaison/grpc/route/route_table.go} | 19 +-
banyand/liaison/grpc/server.go | 35 ++-
banyand/liaison/grpc/server_test.go | 6 +-
banyand/liaison/http/server.go | 1 +
banyand/metadata/dns/dns.go | 6 +-
banyand/property/gossip/message.go | 2 +
banyand/property/gossip/service.go | 22 ++
banyand/property/property.go | 2 +
banyand/property/service.go | 8 +
banyand/queue/local.go | 15 ++
banyand/queue/pub/pub.go | 34 +++
banyand/queue/queue.go | 3 +
banyand/queue/sub/node.go | 24 ++
banyand/queue/sub/server.go | 72 +++---
docs/api-reference.md | 74 ++++++
pkg/cmdsetup/data.go | 5 +
pkg/cmdsetup/liaison.go | 8 +-
pkg/cmdsetup/standalone.go | 2 +-
.../cluster_state/cluster_state_suite_test.go | 141 ++++++++++++
24 files changed, 834 insertions(+), 60 deletions(-)
diff --git a/api/proto/banyandb/database/v1/rpc.proto
b/api/proto/banyandb/database/v1/rpc.proto
index 4c8d106f..6de71e80 100644
--- a/api/proto/banyandb/database/v1/rpc.proto
+++ b/api/proto/banyandb/database/v1/rpc.proto
@@ -684,3 +684,29 @@ message GetCurrentNodeResponse {
service NodeQueryService {
rpc GetCurrentNode(GetCurrentNodeRequest) returns (GetCurrentNodeResponse) {}
}
+
+message GetClusterStateRequest {}
+
+// RouteTable represents a collection of nodes grouped by their health state.
+// It provides a view of nodes that are registered, actively healthy, and
those being evicted.
+message RouteTable {
+ // registered contains all nodes that have been discovered and registered in
this route.
+ repeated banyandb.database.v1.Node registered = 1;
+ // active contains node names (Node.Metadata.Name) that are currently
healthy and can handle requests.
+ repeated string active = 2;
+ // evictable contains node names (Node.Metadata.Name) that are unhealthy and
being retried before eviction.
+ repeated string evictable = 3;
+}
+
+message GetClusterStateResponse {
+ // Liaison node: map's key could be "tire1" and "tire2". tire1 route traffic
between liaison nodes, tire2 spread data among data nodes
+ // Data node: map's key could be "property" for gossip.
+ // Lifecycle agent: map's key could be the next stage's name.
+ map<string, RouteTable> route_tables = 1;
+}
+
+service ClusterStateService {
+ rpc GetClusterState(GetClusterStateRequest) returns
(GetClusterStateResponse) {
+ option (google.api.http) = {get: "/v1/cluster/state"};
+ }
+}
diff --git a/banyand/backup/lifecycle/cluster_state.go
b/banyand/backup/lifecycle/cluster_state.go
new file mode 100644
index 00000000..17a4ddd2
--- /dev/null
+++ b/banyand/backup/lifecycle/cluster_state.go
@@ -0,0 +1,127 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// clusterStateManager manages the aggregated RouteTable snapshot from all
lifecycle groups.
+type clusterStateManager struct {
+ lastUpdateTime time.Time
+ aggregatedTable *databasev1.RouteTable
+ mu sync.RWMutex
+}
+
+func (m *clusterStateManager) addRouteTable(rt *databasev1.RouteTable) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.aggregatedTable == nil {
+ m.aggregatedTable = &databasev1.RouteTable{
+ Registered: []*databasev1.Node{},
+ Active: []string{},
+ Evictable: []string{},
+ }
+ }
+
+ // deduplicate registered nodes using map keyed by node name
+ nodeMap := make(map[string]*databasev1.Node)
+ for _, node := range m.aggregatedTable.Registered {
+ if node != nil && node.Metadata != nil {
+ nodeMap[node.Metadata.Name] = node
+ }
+ }
+ for _, node := range rt.Registered {
+ if node != nil && node.Metadata != nil {
+ nodeMap[node.Metadata.Name] = node
+ }
+ }
+
+ // deduplicate active node names
+ activeSet := make(map[string]bool)
+ for _, name := range m.aggregatedTable.Active {
+ activeSet[name] = true
+ }
+ for _, name := range rt.Active {
+ activeSet[name] = true
+ }
+
+ // Deduplicate evictable node names
+ evictableSet := make(map[string]bool)
+ for _, name := range m.aggregatedTable.Evictable {
+ evictableSet[name] = true
+ }
+ for _, name := range rt.Evictable {
+ evictableSet[name] = true
+ }
+
+ // Convert maps back to slices
+ m.aggregatedTable.Registered = make([]*databasev1.Node, 0, len(nodeMap))
+ for _, node := range nodeMap {
+ m.aggregatedTable.Registered =
append(m.aggregatedTable.Registered, node)
+ }
+
+ m.aggregatedTable.Active = make([]string, 0, len(activeSet))
+ for name := range activeSet {
+ m.aggregatedTable.Active = append(m.aggregatedTable.Active,
name)
+ }
+
+ m.aggregatedTable.Evictable = make([]string, 0, len(evictableSet))
+ for name := range evictableSet {
+ m.aggregatedTable.Evictable =
append(m.aggregatedTable.Evictable, name)
+ }
+
+ m.lastUpdateTime = time.Now()
+}
+
+func (m *clusterStateManager) getSnapshot() *databasev1.RouteTable {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ if m.aggregatedTable == nil {
+ return &databasev1.RouteTable{
+ Registered: []*databasev1.Node{},
+ Active: []string{},
+ Evictable: []string{},
+ }
+ }
+
+ // deep copy to avoid concurrent modification
+ return proto.Clone(m.aggregatedTable).(*databasev1.RouteTable)
+}
+
+// GetClusterState implements the ClusterStateService.GetClusterState RPC.
+// It returns the aggregated cluster state under the "lifecycle" key.
+func (l *lifecycleService) GetClusterState(_ context.Context, _
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse,
error) {
+ routeTable := l.clusterStateMgr.getSnapshot()
+
+ routeTables := map[string]*databasev1.RouteTable{
+ "lifecycle": routeTable,
+ }
+
+ return &databasev1.GetClusterStateResponse{
+ RouteTables: routeTables,
+ }, nil
+}
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index c001d72f..d64193b5 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -20,16 +20,27 @@ package lifecycle
import (
"context"
"encoding/json"
- "errors"
"fmt"
+ "net"
+ "net/http"
"os"
"path/filepath"
"sort"
+ "strconv"
+ "sync"
"time"
"github.com/benbjohnson/clock"
+ "github.com/go-chi/chi/v5"
+ grpc_validator
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
+ "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
+ "github.com/pkg/errors"
"github.com/robfig/cron/v3"
- "google.golang.org/grpc"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -42,24 +53,34 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/healthcheck"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
)
type service interface {
run.Config
+ run.PreRunner
run.Service
}
var _ service = (*lifecycleService)(nil)
type lifecycleService struct {
+ databasev1.UnimplementedClusterStateServiceServer
metadata metadata.Repo
omr observability.MetricsRegistry
pm protector.Memory
+ clusterStateMgr *clusterStateManager
l *logger.Logger
sch *timestamp.Scheduler
+ grpcServer *grpclib.Server
+ httpSrv *http.Server
+ tlsReloader *pkgtls.Reloader
+ clientCloser context.CancelFunc
+ stopCh chan struct{}
measureRoot string
streamRoot string
traceRoot string
@@ -68,17 +89,26 @@ type lifecycleService struct {
schedule string
cert string
gRPCAddr string
+ lifecycleHost string
+ lifecycleGRPCAddr string
+ lifecycleHTTPAddr string
+ lifecycleCertFile string
+ lifecycleKeyFile string
+ lifecycleGRPCPort uint32
+ lifecycleHTTPPort uint32
maxExecutionTimes int
enableTLS bool
insecure bool
+ lifecycleTLS bool
chunkSize run.Bytes
}
// NewService creates a new lifecycle service.
func NewService(meta metadata.Repo) run.Unit {
ls := &lifecycleService{
- metadata: meta,
- omr: observability.BypassRegistry,
+ metadata: meta,
+ omr: observability.BypassRegistry,
+ clusterStateMgr: &clusterStateManager{},
}
ls.pm = protector.NewMemory(ls.omr)
return ls
@@ -105,10 +135,52 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
flagS.IntVar(&l.maxExecutionTimes, "max-execution-times", 0, "Maximum
number of times to execute the lifecycle migration. 0 means no limit.")
l.chunkSize = run.Bytes(1024 * 1024)
flagS.VarP(&l.chunkSize, "chunk-size", "", "Chunk size in bytes for
streaming data during migration (default: 1MB)")
+
+ // Lifecycle server flags
+ flagS.BoolVar(&l.lifecycleTLS, "lifecycle-tls", false, "connection uses
TLS if true, else plain TCP")
+ flagS.StringVar(&l.lifecycleCertFile, "lifecycle-cert-file", "", "the
TLS cert file")
+ flagS.StringVar(&l.lifecycleKeyFile, "lifecycle-key-file", "", "the TLS
key file")
+ flagS.StringVar(&l.lifecycleHost, "lifecycle-grpc-host", "", "the host
of lifecycle server listens")
+ flagS.Uint32Var(&l.lifecycleGRPCPort, "lifecycle-grpc-port", 17912,
"the port of lifecycle server listens")
+ flagS.Uint32Var(&l.lifecycleHTTPPort, "lifecycle-http-port", 17913,
"the port of lifecycle http api listens")
+
return flagS
}
func (l *lifecycleService) Validate() error {
+ if l.schedule != "" {
+ l.lifecycleGRPCAddr = net.JoinHostPort(l.lifecycleHost,
strconv.FormatUint(uint64(l.lifecycleGRPCPort), 10))
+ if l.lifecycleGRPCAddr == ":" {
+ return errors.New("no gRPC address")
+ }
+ l.lifecycleHTTPAddr = net.JoinHostPort(l.lifecycleHost,
strconv.FormatUint(uint64(l.lifecycleHTTPPort), 10))
+ if l.lifecycleHTTPAddr == ":" {
+ return errors.New("no HTTP address")
+ }
+ if l.lifecycleTLS {
+ if l.lifecycleCertFile == "" {
+ return errors.New("missing cert file when TLS
is enabled")
+ }
+ if l.lifecycleKeyFile == "" {
+ return errors.New("missing key file when TLS is
enabled")
+ }
+ }
+ }
+ return nil
+}
+
+// PreRun initializes the lifecycle service and its embedded server.
+func (l *lifecycleService) PreRun(_ context.Context) error {
+ l.l = logger.GetLogger("lifecycle")
+
+ if l.schedule != "" && l.lifecycleTLS {
+ var err error
+ l.tlsReloader, err = pkgtls.NewReloader(l.lifecycleCertFile,
l.lifecycleKeyFile, l.l)
+ if err != nil {
+ return errors.Wrap(err, "failed to initialize TLS
reloader")
+ }
+ }
+
return nil
}
@@ -116,6 +188,44 @@ func (l *lifecycleService) GracefulStop() {
if l.sch != nil {
l.sch.Close()
}
+
+ l.l.Info().Msg("Stopping lifecycle server")
+
+ if l.tlsReloader != nil {
+ l.tlsReloader.Stop()
+ }
+
+ if l.clientCloser != nil {
+ l.clientCloser()
+ }
+
+ // Stop HTTP server
+ if l.httpSrv != nil {
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ defer cancel()
+ if shutdownErr := l.httpSrv.Shutdown(ctx); shutdownErr != nil {
+ l.l.Warn().Err(shutdownErr).Msg("HTTP server shutdown
error")
+ }
+ }
+
+ // Stop gRPC server
+ if l.grpcServer != nil {
+ stopped := make(chan struct{})
+ go func() {
+ l.grpcServer.GracefulStop()
+ close(stopped)
+ }()
+
+ t := time.NewTimer(10 * time.Second)
+ select {
+ case <-t.C:
+ l.grpcServer.Stop()
+ l.l.Info().Msg("Lifecycle server force stopped")
+ case <-stopped:
+ t.Stop()
+ l.l.Info().Msg("Lifecycle server stopped gracefully")
+ }
+ }
}
func (l *lifecycleService) Name() string {
@@ -124,6 +234,13 @@ func (l *lifecycleService) Name() string {
func (l *lifecycleService) Serve() run.StopNotify {
l.l = logger.GetLogger("lifecycle")
+ l.stopCh = make(chan struct{})
+
+ // Start gRPC/HTTP servers when schedule is set
+ if l.schedule != "" {
+ l.startServers()
+ }
+
done := make(chan struct{})
if l.schedule == "" {
defer close(done)
@@ -152,11 +269,125 @@ func (l *lifecycleService) Serve() run.StopNotify {
})
if err != nil {
l.l.Error().Err(err).Msg("failed to register lifecycle
migration schedule")
+ close(done)
return done
}
+
+ // Wait for either migration completion or server stop
+ go func() {
+ select {
+ case <-done:
+ // Migration completed
+ case <-l.stopCh:
+ // Server stopped
+ close(done)
+ }
+ }()
+
return done
}
+func (l *lifecycleService) startServers() {
+ // Setup gRPC server
+ var opts []grpclib.ServerOption
+ if l.lifecycleTLS && l.tlsReloader != nil {
+ if startErr := l.tlsReloader.Start(); startErr != nil {
+ l.l.Error().Err(startErr).Msg("Failed to start TLS
reloader")
+ close(l.stopCh)
+ return
+ }
+ tlsConfig := l.tlsReloader.GetTLSConfig()
+ creds := credentials.NewTLS(tlsConfig)
+ opts = append(opts, grpclib.Creds(creds))
+ }
+
+ opts = append(opts,
+ grpclib.ChainUnaryInterceptor(
+ grpc_validator.UnaryServerInterceptor(),
+ ),
+ )
+
+ l.grpcServer = grpclib.NewServer(opts...)
+ databasev1.RegisterClusterStateServiceServer(l.grpcServer, l)
+ grpc_health_v1.RegisterHealthServer(l.grpcServer, health.NewServer())
+
+ // Setup HTTP server
+ var ctx context.Context
+ ctx, l.clientCloser = context.WithCancel(context.Background())
+
+ clientOpts := make([]grpclib.DialOption, 0, 1)
+ if l.lifecycleTLS && l.tlsReloader != nil {
+ tlsConfig := l.tlsReloader.GetTLSConfig()
+ creds := credentials.NewTLS(tlsConfig)
+ clientOpts = append(clientOpts,
grpclib.WithTransportCredentials(creds))
+ } else {
+ clientOpts = append(clientOpts,
grpclib.WithTransportCredentials(insecure.NewCredentials()))
+ }
+
+ client, err := healthcheck.NewClient(ctx, l.l, l.lifecycleGRPCAddr,
clientOpts)
+ if err != nil {
+ l.l.Error().Err(err).Msg("Failed to create health check client")
+ close(l.stopCh)
+ return
+ }
+
+ gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
+ if registerErr :=
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(ctx, gwMux,
l.lifecycleGRPCAddr, clientOpts); registerErr != nil {
+ l.l.Error().Err(registerErr).Msg("Failed to register cluster
state service")
+ close(l.stopCh)
+ return
+ }
+
+ mux := chi.NewRouter()
+ mux.Mount("/api", http.StripPrefix("/api", gwMux))
+
+ l.httpSrv = &http.Server{
+ Addr: l.lifecycleHTTPAddr,
+ Handler: mux,
+ ReadHeaderTimeout: 3 * time.Second,
+ }
+
+ // Start both servers
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ // gRPC server goroutine
+ go func() {
+ defer wg.Done()
+ lis, listenErr := net.Listen("tcp", l.lifecycleGRPCAddr)
+ if listenErr != nil {
+ l.l.Error().Err(listenErr).Msg("Failed to listen on
gRPC addr")
+ return
+ }
+ l.l.Info().Str("addr", l.lifecycleGRPCAddr).Msg("Lifecycle gRPC
server listening")
+ if serveErr := l.grpcServer.Serve(lis); serveErr != nil {
+ l.l.Error().Err(serveErr).Msg("gRPC server error")
+ }
+ }()
+
+ // HTTP server goroutine
+ go func() {
+ defer wg.Done()
+ l.l.Info().Str("addr", l.lifecycleHTTPAddr).Msg("Lifecycle HTTP
server listening")
+ var serveErr error
+ if l.lifecycleTLS && l.tlsReloader != nil {
+ l.httpSrv.TLSConfig = l.tlsReloader.GetTLSConfig()
+ serveErr = l.httpSrv.ListenAndServeTLS("", "")
+ } else {
+ serveErr = l.httpSrv.ListenAndServe()
+ }
+ if serveErr != nil && serveErr != http.ErrServerClosed {
+ l.l.Error().Err(serveErr).Msg("HTTP server error")
+ }
+ }()
+
+ // Wait for both servers to stop
+ go func() {
+ wg.Wait()
+ close(l.stopCh)
+ }()
+}
+
func (l *lifecycleService) action() error {
ctx := context.Background()
progress := LoadProgress(l.progressFilePath, l.l)
@@ -564,7 +795,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
func (l *lifecycleService) processStreamGroup(ctx context.Context, g
*commonv1.Group,
streamDir string, nodes []*databasev1.Node, labels map[string]string,
progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -663,7 +894,7 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx
context.Context, g *c
return
}
- resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) {
+ resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpclib.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error)
{
client := streamv1.NewStreamServiceClient(conn)
return client.DeleteExpiredSegments(ctx,
&streamv1.DeleteExpiredSegmentsRequest{
Group: g.Metadata.Name,
@@ -683,7 +914,7 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx
context.Context, g *c
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g
*commonv1.Group, measureDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -746,7 +977,7 @@ func (l *lifecycleService) deleteExpiredMeasureSegments(ctx
context.Context, g *
return
}
- resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, error) {
+ resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpclib.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse,
error) {
client := measurev1.NewMeasureServiceClient(conn)
return client.DeleteExpiredSegments(ctx,
&measurev1.DeleteExpiredSegmentsRequest{
Group: g.Metadata.Name,
@@ -769,7 +1000,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx
context.Context, g *co
return
}
- resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
+ resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpclib.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
client := tracev1.NewTraceServiceClient(conn)
return client.DeleteExpiredSegments(ctx,
&tracev1.DeleteExpiredSegmentsRequest{
Group: g.Metadata.Name,
@@ -789,7 +1020,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx
context.Context, g *co
func (l *lifecycleService) processTraceGroup(ctx context.Context, g
*commonv1.Group, traceDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index 6e405e87..1ca5c333 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -101,8 +101,9 @@ func (gc *GroupConfig) Close() {
}
}
-func parseGroup(g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
- l *logger.Logger, metadata metadata.Repo,
+func parseGroup(
+ g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
+ l *logger.Logger, metadata metadata.Repo, clusterStateMgr
*clusterStateManager,
) (*GroupConfig, error) {
ro := g.ResourceOpts
if ro == nil {
@@ -172,6 +173,10 @@ func parseGroup(g *commonv1.Group, nodeLabels
map[string]string, nodes []*databa
if !existed {
return nil, errors.New("no nodes matched")
}
+
+ if t := client.GetRouteTable(); t != nil {
+ clusterStateMgr.addRouteTable(t)
+ }
return &GroupConfig{
Group: g,
TargetShardNum: nst.ShardNum,
diff --git a/banyand/liaison/grpc/registry_test.go
b/banyand/liaison/grpc/registry_test.go
index 95415a08..4bf50291 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -189,7 +189,7 @@ func setupForRegistry() func() {
tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, pipeline,
metaSvc, grpc.NodeRegistries{
MeasureLiaisonNodeRegistry: nr,
PropertyNodeRegistry: nr,
- }, metricSvc, nil)
+ }, metricSvc, nil, nil)
preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/queue/sub/node.go
b/banyand/liaison/grpc/route/route_table.go
similarity index 55%
copy from banyand/queue/sub/node.go
copy to banyand/liaison/grpc/route/route_table.go
index 3ddd5d2a..0fe568fd 100644
--- a/banyand/queue/sub/node.go
+++ b/banyand/liaison/grpc/route/route_table.go
@@ -15,16 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-package sub
+// Package route defines interfaces and types for providing route table
information.
+package route
import (
- "context"
-
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
)
-func (s *server) GetCurrentNode(context.Context,
*databasev1.GetCurrentNodeRequest) (*databasev1.GetCurrentNodeResponse, error) {
- return &databasev1.GetCurrentNodeResponse{
- Node: s.curNode,
- }, nil
+// TableProvider provides route table information for the cluster state API.
+// Implementations should return a RouteTable with all registered nodes and
their health states.
+type TableProvider interface {
+ // GetRouteTable returns the current route table state.
+ // The returned RouteTable contains:
+ // - Registered: all nodes known to this provider (full Node
information)
+ // - Active: node names (Node.Metadata.Name) that are currently healthy
+ // - Evictable: node names (Node.Metadata.Name) that are unhealthy and
being retried
+ // The returned RouteTable is a copy and safe for concurrent access.
+ GetRouteTable() *databasev1.RouteTable
}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 07460bd7..c0e02ec4 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -43,6 +43,7 @@ import (
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/liaison/pkg/auth"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -88,6 +89,7 @@ type NodeRegistries struct {
type server struct {
databasev1.UnimplementedSnapshotServiceServer
+ databasev1.UnimplementedClusterStateServiceServer
omr observability.MetricsRegistry
schemaRepo metadata.Repo
protector protector.Memory
@@ -111,6 +113,7 @@ type server struct {
groupRepo *groupRepo
*indexRuleBindingRegistryServer
metrics *metrics
+ routeTableProviders map[string]route.TableProvider
keyFile string
authConfigFile string
addr string
@@ -132,7 +135,7 @@ type server struct {
// NewServer returns a new gRPC server.
func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster
queue.Client,
schemaRegistry metadata.Repo, nr NodeRegistries, omr
observability.MetricsRegistry,
- protectorService protector.Memory,
+ protectorService protector.Memory, routeProviders
map[string]route.TableProvider,
) Server {
gr := &groupRepo{resourceOpts: make(map[string]*commonv1.ResourceOpts)}
er := &entityRepo{entitiesMap: make(map[identity]partition.Locator),
measureMap: make(map[identity]*databasev1.Measure)}
@@ -198,9 +201,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
traceRegistryServer: &traceRegistryServer{
schemaRegistry: schemaRegistry,
},
- schemaRepo: schemaRegistry,
- authReloader: auth.InitAuthReloader(),
- protector: protectorService,
+ schemaRepo: schemaRegistry,
+ authReloader: auth.InitAuthReloader(),
+ protector: protectorService,
+ routeTableProviders: routeProviders,
}
s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC,
traceSVC, s.propertyServer}
s.queryAccessLogRecorders = []queryAccessLogRecorder{streamSVC,
measureSVC, traceSVC, s.propertyServer}
@@ -422,6 +426,7 @@ func (s *server) Serve() run.StopNotify {
databasev1.RegisterSnapshotServiceServer(s.ser, s)
databasev1.RegisterPropertyRegistryServiceServer(s.ser,
s.propertyRegistryServer)
databasev1.RegisterTraceRegistryServiceServer(s.ser,
s.traceRegistryServer)
+ databasev1.RegisterClusterStateServiceServer(s.ser, s)
grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
s.stopCh = make(chan struct{})
@@ -538,6 +543,28 @@ func (s *server) calculateGrpcBufferSizes() (int32, int32)
{
return connWindowSize, streamWindowSize
}
+// GetClusterState returns the current state of all nodes in the cluster
organized by route tables.
+func (s *server) GetClusterState(_ context.Context, _
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse,
error) {
+ if s.routeTableProviders == nil {
+ return &databasev1.GetClusterStateResponse{RouteTables:
map[string]*databasev1.RouteTable{}}, nil
+ }
+
+ routeTables := make(map[string]*databasev1.RouteTable,
len(s.routeTableProviders))
+ for routeKey, provider := range s.routeTableProviders {
+ if provider == nil {
+ s.log.Warn().Str("routeKey", routeKey).Msg("route table
provider is nil")
+ continue
+ }
+
+ routeTable := provider.GetRouteTable()
+ if routeTable != nil {
+ routeTables[routeKey] = routeTable
+ }
+ }
+
+ return &databasev1.GetClusterStateResponse{RouteTables: routeTables},
nil
+}
+
func (s *server) GracefulStop() {
s.log.Info().Msg("stopping")
if s.tls && s.tlsReloader != nil {
diff --git a/banyand/liaison/grpc/server_test.go
b/banyand/liaison/grpc/server_test.go
index 2b7fb119..4c879bae 100644
--- a/banyand/liaison/grpc/server_test.go
+++ b/banyand/liaison/grpc/server_test.go
@@ -105,14 +105,14 @@ func TestNewServerWithProtector(t *testing.T) {
protectorService := &mockProtector{state: protector.StateLow}
// Create server with protector - should not panic
- server := NewServer(context.Background(), nil, nil, nil, nil,
NodeRegistries{}, nil, protectorService)
+ server := NewServer(context.Background(), nil, nil, nil, nil,
NodeRegistries{}, nil, protectorService, nil)
assert.NotNil(t, server)
}
// TestNewServerWithoutProtector verifies nil protector handling.
func TestNewServerWithoutProtector(t *testing.T) {
// Server creation should not fail with nil protector (fail open)
- server := NewServer(context.Background(), nil, nil, nil, nil,
NodeRegistries{}, nil, nil)
+ server := NewServer(context.Background(), nil, nil, nil, nil,
NodeRegistries{}, nil, nil, nil)
assert.NotNil(t, server)
}
@@ -397,7 +397,7 @@ func setupTestServer(t *testing.T, protectorService
protector.Memory) (string, f
StreamLiaisonNodeRegistry: nr,
PropertyNodeRegistry: nr,
TraceLiaisonNodeRegistry: nr,
- }, metricSvc, protectorService)
+ }, metricSvc, protectorService, nil)
// Configure server - use a fixed port for testing
grpcServer.(*server).host = "localhost"
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 8f6282fd..3a6e5b28 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -357,6 +357,7 @@ func (p *server) initGRPCClient() error {
databasev1.RegisterTopNAggregationRegistryServiceHandlerFromEndpoint(p.grpcCtx,
p.gwMux, p.grpcAddr, opts),
databasev1.RegisterSnapshotServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux,
p.grpcAddr, opts),
databasev1.RegisterPropertyRegistryServiceHandlerFromEndpoint(p.grpcCtx,
p.gwMux, p.grpcAddr, opts),
+
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux,
p.grpcAddr, opts),
streamv1.RegisterStreamServiceHandlerFromEndpoint(p.grpcCtx,
p.gwMux, p.grpcAddr, opts),
measurev1.RegisterMeasureServiceHandlerFromEndpoint(p.grpcCtx,
p.gwMux, p.grpcAddr, opts),
propertyv1.RegisterPropertyServiceHandlerFromEndpoint(p.grpcCtx, p.gwMux,
p.grpcAddr, opts),
diff --git a/banyand/metadata/dns/dns.go b/banyand/metadata/dns/dns.go
index f3af3d52..2664f5df 100644
--- a/banyand/metadata/dns/dns.go
+++ b/banyand/metadata/dns/dns.go
@@ -49,7 +49,7 @@ type Service struct {
closer *run.Closer
log *logger.Logger
metrics *metrics
- handlers map[string]schema.EventHandler
+ handlers []schema.EventHandler
lastSuccessfulDNS map[string][]string
srvAddresses []string
caCertPaths []string
@@ -110,7 +110,7 @@ func NewService(cfg Config) (*Service, error) {
tlsEnabled: cfg.TLSEnabled,
caCertPaths: cfg.CACertPaths,
nodeCache: make(map[string]*databasev1.Node),
- handlers: make(map[string]schema.EventHandler),
+ handlers: make([]schema.EventHandler, 0),
lastSuccessfulDNS: make(map[string][]string),
pathToReloader: make(map[string]*pkgtls.Reloader),
srvAddrToPath: make(map[string]string),
@@ -532,7 +532,7 @@ func (s *Service) RegisterHandler(name string, handler
schema.EventHandler) {
s.handlersMutex.Lock()
defer s.handlersMutex.Unlock()
- s.handlers[name] = handler
+ s.handlers = append(s.handlers, handler)
s.log.Debug().Str("handler", name).Msg("Registered DNS node discovery
handler")
}
diff --git a/banyand/property/gossip/message.go
b/banyand/property/gossip/message.go
index b6b3c9a5..30a5969e 100644
--- a/banyand/property/gossip/message.go
+++ b/banyand/property/gossip/message.go
@@ -24,6 +24,7 @@ import (
"google.golang.org/grpc"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -60,6 +61,7 @@ type MessageClient interface {
// MessageServer is an interface that defines methods for subscribing to
topics and receiving messages in a gossip protocol.
type MessageServer interface {
run.Unit
+ route.TableProvider
// Subscribe allows subscribing to a topic to receive messages.
Subscribe(listener MessageListener)
// RegisterServices registers the gRPC services with the provided
server.
diff --git a/banyand/property/gossip/service.go
b/banyand/property/gossip/service.go
index db44a65d..be377d53 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -254,6 +254,28 @@ func (s *service) Serve(stopCh chan struct{}) {
}()
}
+// GetRouteTable implements RouteTableProvider interface.
+// For gossip messenger, all registered nodes are considered active since
gossip
+// protocol doesn't track separate health states.
+func (s *service) GetRouteTable() *databasev1.RouteTable {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ registered := make([]*databasev1.Node, 0, len(s.registered))
+
+ for _, node := range s.registered {
+ if node != nil {
+ registered = append(registered, node)
+ }
+ }
+
+ return &databasev1.RouteTable{
+ Registered: registered,
+ Active: []string{},
+ Evictable: []string{},
+ }
+}
+
func (s *service) GracefulStop() {
if s.ser == nil {
return
diff --git a/banyand/property/property.go b/banyand/property/property.go
index b58931d9..38948aaa 100644
--- a/banyand/property/property.go
+++ b/banyand/property/property.go
@@ -23,6 +23,7 @@ import (
"strings"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -32,6 +33,7 @@ type Service interface {
run.PreRunner
run.Config
run.Service
+ route.TableProvider
GetGossIPGrpcPort() *uint32
}
diff --git a/banyand/property/service.go b/banyand/property/service.go
index bb82829d..275053e6 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -199,6 +199,14 @@ func (s *service) GetGossIPGrpcPort() *uint32 {
return s.gossipMessenger.GetServerPort()
}
+// GetRouteTable implements RouteTableProvider interface by delegating to
gossipMessenger.
+func (s *service) GetRouteTable() *databasev1.RouteTable {
+ if s.gossipMessenger == nil {
+ return nil
+ }
+ return s.gossipMessenger.GetRouteTable()
+}
+
// NewService returns a new service.
func NewService(metadata metadata.Repo, pipeline queue.Server, pipelineClient
queue.Client, omr observability.MetricsRegistry, pm protector.Memory) (Service,
error) {
return &service{
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 455b91af..a83343b8 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -24,6 +24,8 @@ import (
"time"
"github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -96,6 +98,9 @@ func (*local) GetPort() *uint32 {
return nil
}
+func (*local) SetRouteProviders(_ map[string]route.TableProvider) {
+}
+
func (*local) Register(bus.Topic, schema.EventHandler) {
}
@@ -103,6 +108,16 @@ func (*local) HealthyNodes() []string {
return nil
}
+// GetRouteTable returns an empty route table for local queue.
+// Local queue doesn't have distributed nodes, so all fields are empty.
+func (*local) GetRouteTable() *databasev1.RouteTable {
+ return &databasev1.RouteTable{
+ Registered: []*databasev1.Node{},
+ Active: []string{},
+ Evictable: []string{},
+ }
+}
+
type localBatchPublisher struct {
ctx context.Context
local *bus.Bus
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 26d5320f..2ce624ab 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -319,6 +319,40 @@ func (p *pub) Publish(_ context.Context, topic bus.Topic,
messages ...bus.Messag
return p.publish(15*time.Second, topic, messages...)
}
+// GetRouteTable implements RouteTableProvider interface.
+// Returns a RouteTable with all registered nodes and their health states.
+func (p *pub) GetRouteTable() *databasev1.RouteTable {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ registered := make([]*databasev1.Node, 0, len(p.registered))
+ for _, node := range p.registered {
+ if node != nil {
+ registered = append(registered, node)
+ }
+ }
+
+ active := make([]string, 0, len(p.active))
+ for nodeID := range p.active {
+ if node := p.registered[nodeID]; node != nil && node.Metadata
!= nil {
+ active = append(active, node.Metadata.Name)
+ }
+ }
+
+ evictable := make([]string, 0, len(p.evictable))
+ for nodeID := range p.evictable {
+ if node := p.registered[nodeID]; node != nil && node.Metadata
!= nil {
+ evictable = append(evictable, node.Metadata.Name)
+ }
+ }
+
+ return &databasev1.RouteTable{
+ Registered: registered,
+ Active: active,
+ Evictable: evictable,
+ }
+}
+
// New returns a new queue client targeting the given node roles.
// If no roles are passed, it defaults to databasev1.Role_ROLE_DATA.
func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client {
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index defb57e5..c51dc294 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -46,6 +47,7 @@ type Client interface {
run.Unit
bus.Publisher
bus.Broadcaster
+ route.TableProvider
NewBatchPublisher(timeout time.Duration) BatchPublisher
NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient,
error)
Register(bus.Topic, schema.EventHandler)
@@ -60,6 +62,7 @@ type Server interface {
bus.Subscriber
RegisterChunkedSyncHandler(topic bus.Topic, handler ChunkedSyncHandler)
GetPort() *uint32
+ SetRouteProviders(providers map[string]route.TableProvider)
}
// BatchPublisher is the interface for publishing data in batch.
diff --git a/banyand/queue/sub/node.go b/banyand/queue/sub/node.go
index 3ddd5d2a..b1e0b697 100644
--- a/banyand/queue/sub/node.go
+++ b/banyand/queue/sub/node.go
@@ -28,3 +28,27 @@ func (s *server) GetCurrentNode(context.Context,
*databasev1.GetCurrentNodeReque
Node: s.curNode,
}, nil
}
+
+func (s *server) GetClusterState(context.Context,
*databasev1.GetClusterStateRequest) (*databasev1.GetClusterStateResponse,
error) {
+ s.routeTableProviderMu.RLock()
+ defer s.routeTableProviderMu.RUnlock()
+
+ if s.routeTableProvider == nil {
+ return &databasev1.GetClusterStateResponse{RouteTables:
map[string]*databasev1.RouteTable{}}, nil
+ }
+
+ routeTables := make(map[string]*databasev1.RouteTable,
len(s.routeTableProvider))
+ for routeKey, provider := range s.routeTableProvider {
+ if provider == nil {
+ s.log.Warn().Str("routeKey", routeKey).Msg("route table
provider is nil")
+ continue
+ }
+
+ routeTable := provider.GetRouteTable()
+ if routeTable != nil {
+ routeTables[routeKey] = routeTable
+ }
+ }
+
+ return &databasev1.GetClusterStateResponse{RouteTables: routeTables},
nil
+}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index c2d45922..25ebc76d 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -47,6 +47,7 @@ import (
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -71,39 +72,41 @@ var (
)
type server struct {
- clusterv1.UnimplementedServiceServer
clusterv1.UnimplementedChunkedSyncServiceServer
streamv1.UnimplementedStreamServiceServer
databasev1.UnimplementedSnapshotServiceServer
databasev1.UnimplementedNodeQueryServiceServer
- creds credentials.TransportCredentials
- tlsReloader *pkgtls.Reloader
- omr observability.MetricsRegistry
- metrics *metrics
- ser *grpclib.Server
- listeners map[bus.Topic][]bus.MessageListener
- topicMap map[string]bus.Topic
- chunkedSyncHandlers map[bus.Topic]queue.ChunkedSyncHandler
- log *logger.Logger
- httpSrv *http.Server
- curNode *databasev1.Node
- clientCloser context.CancelFunc
- httpAddr string
- addr string
- host string
- certFile string
- keyFile string
- flagNamePrefix string
- maxRecvMsgSize run.Bytes
- listenersLock sync.RWMutex
- port uint32
- httpPort uint32
- tls bool
- // Chunk ordering configuration
- enableChunkReordering bool
- maxChunkBufferSize uint32
+ databasev1.UnimplementedClusterStateServiceServer
+ clusterv1.UnimplementedServiceServer
+ omr observability.MetricsRegistry
+ creds credentials.TransportCredentials
+ curNode *databasev1.Node
+ metrics *metrics
+ ser *grpclib.Server
+ listeners map[bus.Topic][]bus.MessageListener
+ topicMap map[string]bus.Topic
+ chunkedSyncHandlers map[bus.Topic]queue.ChunkedSyncHandler
+ log *logger.Logger
+ httpSrv *http.Server
+ tlsReloader *pkgtls.Reloader
+ clientCloser context.CancelFunc
+ routeTableProvider map[string]route.TableProvider
+ certFile string
+ addr string
+ keyFile string
+ flagNamePrefix string
+ httpAddr string
+ host string
chunkBufferTimeout time.Duration
+ maxRecvMsgSize run.Bytes
+ listenersLock sync.RWMutex
+ routeTableProviderMu sync.RWMutex
+ port uint32
+ httpPort uint32
+ maxChunkBufferSize uint32
maxChunkGapSize uint32
+ tls bool
+ enableChunkReordering bool
}
// NewServer returns a new gRPC server.
@@ -113,7 +116,7 @@ func NewServer(omr observability.MetricsRegistry)
queue.Server {
// NewServerWithPorts returns a new gRPC server with specified ports.
func NewServerWithPorts(omr observability.MetricsRegistry, flagNamePrefix
string, port, httpPort uint32) queue.Server {
- return &server{
+ srv := &server{
listeners: make(map[bus.Topic][]bus.MessageListener),
topicMap: make(map[string]bus.Topic),
chunkedSyncHandlers:
make(map[bus.Topic]queue.ChunkedSyncHandler),
@@ -128,6 +131,7 @@ func NewServerWithPorts(omr observability.MetricsRegistry,
flagNamePrefix string
chunkBufferTimeout: 5 * time.Second,
maxChunkGapSize: 5,
}
+ return srv
}
func (s *server) PreRun(ctx context.Context) error {
@@ -283,6 +287,7 @@ func (s *server) Serve() run.StopNotify {
grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
databasev1.RegisterSnapshotServiceServer(s.ser, s)
databasev1.RegisterNodeQueryServiceServer(s.ser, s)
+ databasev1.RegisterClusterStateServiceServer(s.ser, s)
streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})
tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s})
@@ -308,6 +313,11 @@ func (s *server) Serve() run.StopNotify {
close(stopCh)
return stopCh
}
+ if err :=
databasev1.RegisterClusterStateServiceHandlerFromEndpoint(ctx, gwMux, s.addr,
clientOpts); err != nil {
+ s.log.Error().Err(err).Msg("Failed to register cluster state
service")
+ close(stopCh)
+ return stopCh
+ }
mux := chi.NewRouter()
mux.Mount("/api", http.StripPrefix("/api", gwMux))
s.httpSrv = &http.Server{
@@ -381,6 +391,12 @@ func (s *server) RegisterChunkedSyncHandler(topic
bus.Topic, handler queue.Chunk
s.chunkedSyncHandlers[topic] = handler
}
+func (s *server) SetRouteProviders(providers map[string]route.TableProvider) {
+ s.routeTableProviderMu.Lock()
+ s.routeTableProvider = providers
+ s.routeTableProviderMu.Unlock()
+}
+
type metrics struct {
totalStarted meter.Counter
totalFinished meter.Counter
diff --git a/docs/api-reference.md b/docs/api-reference.md
index adbacc1d..64ed1353 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -165,6 +165,9 @@
- [TagType](#banyandb-database-v1-TagType)
- [banyandb/database/v1/rpc.proto](#banyandb_database_v1_rpc-proto)
+ - [GetClusterStateRequest](#banyandb-database-v1-GetClusterStateRequest)
+ - [GetClusterStateResponse](#banyandb-database-v1-GetClusterStateResponse)
+ -
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
- [GetCurrentNodeRequest](#banyandb-database-v1-GetCurrentNodeRequest)
- [GetCurrentNodeResponse](#banyandb-database-v1-GetCurrentNodeResponse)
-
[GroupRegistryServiceCreateRequest](#banyandb-database-v1-GroupRegistryServiceCreateRequest)
@@ -227,6 +230,7 @@
-
[PropertyRegistryServiceListResponse](#banyandb-database-v1-PropertyRegistryServiceListResponse)
-
[PropertyRegistryServiceUpdateRequest](#banyandb-database-v1-PropertyRegistryServiceUpdateRequest)
-
[PropertyRegistryServiceUpdateResponse](#banyandb-database-v1-PropertyRegistryServiceUpdateResponse)
+ - [RouteTable](#banyandb-database-v1-RouteTable)
- [Snapshot](#banyandb-database-v1-Snapshot)
- [SnapshotRequest](#banyandb-database-v1-SnapshotRequest)
- [SnapshotRequest.Group](#banyandb-database-v1-SnapshotRequest-Group)
@@ -268,6 +272,7 @@
-
[TraceRegistryServiceUpdateRequest](#banyandb-database-v1-TraceRegistryServiceUpdateRequest)
-
[TraceRegistryServiceUpdateResponse](#banyandb-database-v1-TraceRegistryServiceUpdateResponse)
+ - [ClusterStateService](#banyandb-database-v1-ClusterStateService)
- [GroupRegistryService](#banyandb-database-v1-GroupRegistryService)
-
[IndexRuleBindingRegistryService](#banyandb-database-v1-IndexRuleBindingRegistryService)
-
[IndexRuleRegistryService](#banyandb-database-v1-IndexRuleRegistryService)
@@ -2636,6 +2641,47 @@ Type determine the index structure under the hood
+<a name="banyandb-database-v1-GetClusterStateRequest"></a>
+
+### GetClusterStateRequest
+
+
+
+
+
+
+
+<a name="banyandb-database-v1-GetClusterStateResponse"></a>
+
+### GetClusterStateResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| route_tables |
[GetClusterStateResponse.RouteTablesEntry](#banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry)
| repeated | Liaison node: map's key could be "tire1" and
"tire2". tire1 route traffic between liaison nodes, tire2 spread data
among data nodes Data node: map's key could be "property" for
gossip. Lifecycle agent: map's key could be the next stage's name. |
+
+
+
+
+
+
+<a name="banyandb-database-v1-GetClusterStateResponse-RouteTablesEntry"></a>
+
+### GetClusterStateResponse.RouteTablesEntry
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| key | [string](#string) | | |
+| value | [RouteTable](#banyandb-database-v1-RouteTable) | | |
+
+
+
+
+
+
<a name="banyandb-database-v1-GetCurrentNodeRequest"></a>
### GetCurrentNodeRequest
@@ -3530,6 +3576,24 @@ Type determine the index structure under the hood
+<a name="banyandb-database-v1-RouteTable"></a>
+
+### RouteTable
+RouteTable represents a collection of nodes grouped by their health state.
+It provides a view of nodes that are registered, actively healthy, and those
being evicted.
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| registered | [Node](#banyandb-database-v1-Node) | repeated | registered
contains all nodes that have been discovered and registered in this route. |
+| active | [string](#string) | repeated | active contains node names
(Node.Metadata.Name) that are currently healthy and can handle requests. |
+| evictable | [string](#string) | repeated | evictable contains node names
(Node.Metadata.Name) that are unhealthy and being retried before eviction. |
+
+
+
+
+
+
<a name="banyandb-database-v1-Snapshot"></a>
### Snapshot
@@ -4132,6 +4196,16 @@ Type determine the index structure under the hood
+<a name="banyandb-database-v1-ClusterStateService"></a>
+
+### ClusterStateService
+
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| GetClusterState |
[GetClusterStateRequest](#banyandb-database-v1-GetClusterStateRequest) |
[GetClusterStateResponse](#banyandb-database-v1-GetClusterStateResponse) | |
+
+
<a name="banyandb-database-v1-GroupRegistryService"></a>
### GroupRegistryService
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index ff90cb2e..46d8a579 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -24,6 +24,7 @@ import (
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -57,6 +58,10 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate property service")
}
+ pipeline.SetRouteProviders(map[string]route.TableProvider{
+ "property": propertySvc,
+ })
+
streamSvc, err := stream.NewService(metaSvc, pipeline, metricSvc, pm,
propertyStreamPipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index d613115e..a3c19c2c 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -28,6 +28,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/dquery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc/route"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -89,12 +90,17 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
l.Fatal().Err(err).Msg("failed to initiate distributed query
service")
}
+ routeProviders := map[string]route.TableProvider{
+ "tire1": tire1Client,
+ "tire2": tire2Client,
+ }
+
grpcServer := grpc.NewServer(ctx, tire1Client, tire2Client,
localPipeline, metaSvc, grpc.NodeRegistries{
MeasureLiaisonNodeRegistry: measureLiaisonNodeRegistry,
StreamLiaisonNodeRegistry:
grpc.NewClusterNodeRegistry(data.TopicStreamWrite, tire1Client,
streamLiaisonNodeSel),
PropertyNodeRegistry:
grpc.NewClusterNodeRegistry(data.TopicPropertyUpdate, tire2Client,
propertyNodeSel),
TraceLiaisonNodeRegistry:
grpc.NewClusterNodeRegistry(data.TopicTraceWrite, tire1Client,
traceLiaisonNodeSel),
- }, metricSvc, pm)
+ }, metricSvc, pm, routeProviders)
profSvc := observability.NewProfService()
httpServer := http.NewServer(grpcServer.GetAuthReloader())
var units []run.Unit
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 14d80fbb..5fb8c42e 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -82,7 +82,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
StreamLiaisonNodeRegistry: nr,
PropertyNodeRegistry: nr,
TraceLiaisonNodeRegistry: nr,
- }, metricSvc, pm)
+ }, metricSvc, pm, nil)
profSvc := observability.NewProfService()
httpServer := http.NewServer(grpcServer.GetAuthReloader())
diff --git
a/test/integration/distributed/cluster_state/cluster_state_suite_test.go
b/test/integration/distributed/cluster_state/cluster_state_suite_test.go
new file mode 100644
index 00000000..0449e5a5
--- /dev/null
+++ b/test/integration/distributed/cluster_state/cluster_state_suite_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cluster_state_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestClusterState(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Distributed Get Cluster State Suite")
+}
+
+var (
+ dataConnection *grpc.ClientConn
+ liaisonConnection *grpc.ClientConn
+ srcDir string
+ deferFunc func()
+ goods []gleak.Goroutine
+ dataAddr string
+ liaisonAddr string
+ ep string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ goods = gleak.Goroutines()
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ var spaceDef func()
+ srcDir, spaceDef, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep = fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(srcDir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{ep}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer schemaRegistry.Close()
+ By("Starting data node")
+ var closeDataNode0 func()
+ dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep,
"--property-repair-enabled=true")
+ By("Starting liaison node")
+ var closerLiaisonNode func()
+ liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep)
+ time.Sleep(flags.ConsistentlyTimeout)
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+ liaisonConnection, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+ dataConnection, err = grpchelper.Conn(dataAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+ return nil
+}, func(_ []byte) {
+})
+
+var _ = Describe("ClusterState API", func() {
+ It("Check cluster state", func() {
+ client :=
databasev1.NewClusterStateServiceClient(dataConnection)
+ state, err := client.GetClusterState(context.Background(),
&databasev1.GetClusterStateRequest{})
+ Expect(err).NotTo(HaveOccurred())
+ Expect(state.GetRouteTables()).To(HaveKey("property"))
+ client =
databasev1.NewClusterStateServiceClient(liaisonConnection)
+ state, err = client.GetClusterState(context.Background(),
&databasev1.GetClusterStateRequest{})
+ Expect(err).NotTo(HaveOccurred())
+ Expect(state.GetRouteTables()).To(HaveKey("tire1"))
+ Expect(state.GetRouteTables()).To(HaveKey("tire2"))
+ })
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if dataConnection != nil {
+ Expect(dataConnection.Close()).To(Succeed())
+ }
+ if liaisonConnection != nil {
+ Expect(liaisonConnection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Lifecycle Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ }
+})