Copilot commented on code in PR #979:
URL:
https://github.com/apache/skywalking-banyandb/pull/979#discussion_r2846183976
##########
pkg/meter/native/provider.go:
##########
@@ -53,55 +54,90 @@ type NodeInfo struct {
HTTPAddress string
}
+type pendingMeasure struct {
+ name string
+ labels []string
+}
+
type provider struct {
- metadata metadata.Repo
- scope meter.Scope
- nodeInfo NodeInfo
+ metadata metadata.Repo
+ scope meter.Scope
+ nodeInfo NodeInfo
+ pendingMeasures []pendingMeasure
+ mu sync.Mutex
+ initialized atomic.Bool
}
// NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
- p := &provider{
+func NewProvider(scope meter.Scope, metadata metadata.Repo, nodeInfo NodeInfo)
meter.Provider {
+ return &provider{
scope: scope,
metadata: metadata,
nodeInfo: nodeInfo,
}
- err := p.createNativeObservabilityGroup(ctx)
- if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
- log.Error().Err(err).Msg("Failed to create native observability
group")
+}
+
+// InitSchema creates the native observability group and all pending measures
for the given provider.
+// It should be called after the metadata service is ready (during Serve
phase).
+func InitSchema(ctx context.Context, p meter.Provider) {
+ np, ok := p.(*provider)
+ if !ok {
+ return
+ }
+ np.initialized.Store(true)
+ groupErr := np.createNativeObservabilityGroup(ctx)
+ if groupErr != nil && !errors.Is(groupErr, schema.ErrGRPCAlreadyExists)
{
+ log.Error().Err(groupErr).Msg("Failed to create native
observability group")
+ }
Review Comment:
`InitSchema` sets `initialized` to true before
`createNativeObservabilityGroup` and before flushing `pendingMeasures`. If a
metric is registered concurrently after `initialized` flips, `registerOrDefer`
may try to create measures before the observability group exists, causing
avoidable failures. Consider setting `initialized` only after the group is
created (and/or after pending measures are processed).
##########
scripts/build/lint.mk:
##########
@@ -22,7 +22,7 @@ include $(mk_dir)lint-bin.mk
.PHONY: lint
lint: $(LINTER) $(REVIVE) ## Run all linters
- $(LINTER) run -v --config $(root_dir)/.golangci.yml --timeout 10m ./...
&& \
+ $(LINTER) run -v --config $(root_dir)/.golangci.yml --timeout 10m ./...
--fix && \
$(REVIVE) -config $(root_dir)/revive.toml -formatter friendly ./...
Review Comment:
In the `lint` target, `--fix` is placed after the package arg (`./...`).
`golangci-lint run` expects flags before package patterns, so this can be
interpreted as an extra package and break the target. Consider moving `--fix`
before `./...` (similar to the `format` target).
##########
banyand/metadata/schema/property/client.go:
##########
@@ -0,0 +1,1300 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.uber.org/multierr"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/api/validate"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+ // DefaultGRPCTimeout is the default timeout for gRPC calls to schema
servers.
+ DefaultGRPCTimeout = 5 * time.Second
+ // DefaultSyncInterval is the default polling interval for
property-based schema sync.
+ DefaultSyncInterval = 30 * time.Second
+ // DefaultInitWaitTime is the default maximum time to wait for at least
one schema server to become active during initialization.
+ DefaultInitWaitTime = time.Minute
+)
+
+var _ schema.Registry = (*SchemaRegistry)(nil)
+
+var errNoActiveServers = fmt.Errorf("no active schema servers available")
+
+// ClientConfig holds configuration for the property-based schema registry
client.
+type ClientConfig struct {
+ OMR observability.MetricsRegistry
+ NodeRegistry schema.Node
+ CurNode *databasev1.Node
+ CACertPath string
+ GRPCTimeout time.Duration
+ SyncInterval time.Duration
+ InitWaitTime time.Duration
+ FullReconcileEvery uint64
+ MaxRecvMsgSize int
+ TLSEnabled bool
+}
+
+// SchemaRegistry implements schema.Registry using property-based schema
servers.
+type SchemaRegistry struct {
+ connMgr *grpchelper.ConnManager[*schemaClient]
+ closer *run.Closer
+ l *logger.Logger
+ cache *schemaCache
+ caCertReloader *pkgtls.Reloader
+ handlers map[schema.Kind][]schema.EventHandler
+ timeout time.Duration
+ syncInterval time.Duration
+ fullReconcileEvery uint64
+ syncRound uint64
+ mux sync.RWMutex
+}
+
+// NewSchemaRegistryClient creates a new property-based schema registry client.
+func NewSchemaRegistryClient(cfg *ClientConfig) (*SchemaRegistry, error) {
+ l := logger.GetLogger("property-schema-registry")
+ var caCertReloader *pkgtls.Reloader
+ if cfg.TLSEnabled && cfg.CACertPath != "" {
+ var reloaderErr error
+ caCertReloader, reloaderErr =
pkgtls.NewClientCertReloader(cfg.CACertPath, l)
+ if reloaderErr != nil {
+ return nil, fmt.Errorf("failed to initialize CA
certificate reloader: %w", reloaderErr)
+ }
+ }
+ handler := &connectionHandler{
+ l: l,
+ caCertReloader: caCertReloader,
+ tlsEnabled: cfg.TLSEnabled,
+ }
+ connMgr :=
grpchelper.NewConnManager[*schemaClient](grpchelper.ConnManagerConfig[*schemaClient]{
+ Handler: handler,
+ Logger: l,
+ MaxRecvMsgSize: cfg.MaxRecvMsgSize,
+ })
+ timeout := cfg.GRPCTimeout
+ if timeout == 0 {
+ timeout = DefaultGRPCTimeout
+ }
+ syncInterval := cfg.SyncInterval
+ if syncInterval == 0 {
+ syncInterval = DefaultSyncInterval
+ }
+ initWaitTime := cfg.InitWaitTime
+ if initWaitTime == 0 {
+ initWaitTime = DefaultInitWaitTime
+ }
+ fullReconcileEvery := cfg.FullReconcileEvery
+ if fullReconcileEvery == 0 {
+ fullReconcileEvery = 5
+ }
+ reg := &SchemaRegistry{
+ connMgr: connMgr,
+ closer: run.NewCloser(1),
+ l: l,
+ cache: newSchemaCache(),
+ caCertReloader: caCertReloader,
+ handlers: make(map[schema.Kind][]schema.EventHandler),
+ timeout: timeout,
+ syncInterval: syncInterval,
+ fullReconcileEvery: fullReconcileEvery,
+ }
+
+ if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
+ connMgr.OnAddOrUpdate(cfg.CurNode)
+ } else if cfg.NodeRegistry != nil {
+ var nodesAdded bool
+ nodes, listErr :=
cfg.NodeRegistry.ListNode(context.Background(), databasev1.Role_ROLE_META)
+ if listErr != nil {
+ return nil, fmt.Errorf("failed to list meta nodes: %w",
listErr)
+ }
+ for _, node := range nodes {
+ if isPropertySchemaNode(node) {
+ connMgr.OnAddOrUpdate(node)
+ nodesAdded = true
+ }
+ }
+ if !nodesAdded {
+ _ = reg.Close()
+ return nil, fmt.Errorf("no property schema nodes found
among %d meta nodes", len(nodes))
+ }
+ }
+
+ // Wait for at least one schema server to become active.
+ // OnAddOrUpdate health-checks synchronously; if it fails, a background
+ // retry goroutine retries backoff. Poll until active or timeout.
+ if connMgr.ActiveCount() == 0 {
+ waitDeadline := time.Now().Add(initWaitTime)
+ for connMgr.ActiveCount() == 0 &&
time.Now().Before(waitDeadline) {
+ time.Sleep(500 * time.Millisecond)
+ }
+ if connMgr.ActiveCount() == 0 {
+ _ = reg.Close()
+ return nil, fmt.Errorf("no schema servers reachable
after %s", timeout)
Review Comment:
This initialization failure message uses `timeout` (the gRPC timeout)
instead of the actual wait deadline (`initWaitTime`) used in the loop. This can
be misleading when diagnosing startup issues; consider reporting `initWaitTime`
(or the elapsed time) here.
```suggestion
return nil, fmt.Errorf("no schema servers reachable
after %s", initWaitTime)
```
##########
banyand/metadata/service/server.go:
##########
@@ -77,6 +81,8 @@ func (s *server) FlagSet() *run.FlagSet {
fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url",
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url",
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota
for backend storage")
+ fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode",
metadata.NodeDiscoveryModeEtcd,
+ "Node discovery mode: 'etcd' for etcd-based, 'dns' for
DNS-based, 'file' for file-based")
if s.propServer != nil {
Review Comment:
`FlagSet()` defines `--node-discovery-mode` on this server FlagSet, but the
embedded `s.Service.FlagSet()` (metadata client) also defines the same flag.
Adding both flagsets will likely cause a duplicate-flag error/panic at startup.
Consider removing this flag from the outer server FlagSet and relying on the
embedded service flag (or rename one of them).
##########
pkg/meter/native/provider.go:
##########
@@ -53,55 +54,90 @@ type NodeInfo struct {
HTTPAddress string
}
+type pendingMeasure struct {
+ name string
+ labels []string
+}
+
type provider struct {
- metadata metadata.Repo
- scope meter.Scope
- nodeInfo NodeInfo
+ metadata metadata.Repo
+ scope meter.Scope
+ nodeInfo NodeInfo
+ pendingMeasures []pendingMeasure
+ mu sync.Mutex
+ initialized atomic.Bool
}
// NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
- p := &provider{
+func NewProvider(scope meter.Scope, metadata metadata.Repo, nodeInfo NodeInfo)
meter.Provider {
+ return &provider{
scope: scope,
metadata: metadata,
nodeInfo: nodeInfo,
}
- err := p.createNativeObservabilityGroup(ctx)
- if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
- log.Error().Err(err).Msg("Failed to create native observability
group")
+}
+
+// InitSchema creates the native observability group and all pending measures
for the given provider.
+// It should be called after the metadata service is ready (during Serve
phase).
+func InitSchema(ctx context.Context, p meter.Provider) {
+ np, ok := p.(*provider)
+ if !ok {
+ return
+ }
+ np.initialized.Store(true)
+ groupErr := np.createNativeObservabilityGroup(ctx)
+ if groupErr != nil && !errors.Is(groupErr, schema.ErrGRPCAlreadyExists)
{
+ log.Error().Err(groupErr).Msg("Failed to create native
observability group")
+ }
+ np.mu.Lock()
+ pending := np.pendingMeasures
+ np.pendingMeasures = nil
+ np.mu.Unlock()
+ for _, pm := range pending {
+ _, measureErr := np.createMeasure(ctx, pm.name, pm.labels...)
+ if measureErr != nil && !errors.Is(measureErr,
schema.ErrGRPCAlreadyExists) {
+ log.Error().Err(measureErr).Msgf("Failed to create
measure %s", pm.name)
+ }
}
- return p
}
// Counter returns a native implementation of the Counter interface.
func (p *provider) Counter(name string, labelNames ...string) meter.Counter {
- name, err := p.createMeasure(name, labelNames...)
- if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
- log.Error().Err(err).Msgf("Failure to createMeasure for Counter
%s, labels: %v", name, labelNames)
- }
+ p.registerOrDefer(name, labelNames)
return &Counter{
newMetricVec(name, p.scope, p.nodeInfo),
}
}
-// Gauge returns a nativeimplementation of the Gauge interface.
+// Gauge returns a native implementation of the Gauge interface.
func (p *provider) Gauge(name string, labelNames ...string) meter.Gauge {
- name, err := p.createMeasure(name, labelNames...)
- if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
- log.Error().Err(err).Msgf("Failure to createMeasure for Gauge
%s, labels: %v", name, labelNames)
- }
+ p.registerOrDefer(name, labelNames)
return &Gauge{
newMetricVec(name, p.scope, p.nodeInfo),
}
}
// Histogram returns a native implementation of the Histogram interface.
-func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string)
meter.Histogram {
+func (p *provider) Histogram(name string, _ meter.Buckets, labelNames
...string) meter.Histogram {
+ p.registerOrDefer(name, labelNames)
return &Histogram{
newMetricVec(name, p.scope, p.nodeInfo),
}
}
+func (p *provider) registerOrDefer(name string, labels []string) {
+ if p.initialized.Load() {
+ _, measureErr := p.createMeasure(context.Background(), name,
labels...)
+ if measureErr != nil && !errors.Is(measureErr,
schema.ErrGRPCAlreadyExists) {
+ log.Error().Err(measureErr).Msgf("Failed to create
measure %s", name)
+ }
+ return
+ }
+ p.mu.Lock()
+ p.pendingMeasures = append(p.pendingMeasures, pendingMeasure{name:
name, labels: labels})
+ p.mu.Unlock()
Review Comment:
`registerOrDefer` calls `createMeasure(context.Background(), ...)` without
any timeout/cancellation. This can block the caller (metric creation path)
indefinitely if the metadata service is slow/unavailable. Previously this path
used a timeout; consider wrapping the background context with a bounded timeout
(or reusing a configured timeout) and also copying `labels` when enqueuing to
avoid retaining/mutating the caller’s slice.
##########
banyand/observability/services/meter_native.go:
##########
@@ -19,20 +19,32 @@ package services
import (
"context"
- "time"
+ "sync"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/meter/native"
)
type nativeProviderFactory struct {
- metadata metadata.Repo
- nodeInfo native.NodeInfo
+ metadata metadata.Repo
+ nodeInfo native.NodeInfo
+ providers []meter.Provider
+ mu sync.Mutex
}
-func (f nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- return native.NewProvider(ctx, scope, f.metadata, f.nodeInfo)
+func (f *nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
+ p := native.NewProvider(scope, f.metadata, f.nodeInfo)
+ f.mu.Lock()
+ f.providers = append(f.providers, p)
+ f.mu.Unlock()
+ return p
+}
+
+func (f *nativeProviderFactory) initAllSchemas(ctx context.Context) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ for _, p := range f.providers {
Review Comment:
`initAllSchemas` holds `f.mu` while calling `native.InitSchema` for each
provider. `InitSchema` performs RPCs and can block; keeping the factory mutex
held that long can stall concurrent provider creation and risks lock
contention. Consider snapshotting `f.providers` under the lock, releasing it,
then iterating/calling `InitSchema` without holding the mutex.
```suggestion
providers := append([]meter.Provider(nil), f.providers...)
f.mu.Unlock()
for _, p := range providers {
```
##########
banyand/metadata/service/server.go:
##########
@@ -194,6 +212,30 @@ func NewService(_ context.Context) (metadata.Service,
error) {
return s, nil
}
+func (s *server) enrichContextWithSchemaAddress(ctx context.Context)
context.Context {
+ port := s.propServer.GetPort()
+ if port == nil {
+ return ctx
+ }
+ val := ctx.Value(common.ContextNodeKey)
+ if val == nil {
+ return ctx
+ }
+ node := val.(common.Node)
Review Comment:
`enrichContextWithSchemaAddress` does a direct type assertion
`val.(common.Node)` which will panic if the context value isn’t the expected
type. Since this function is on the startup path, consider using the `node, ok
:= val.(common.Node)` form and returning `ctx` when the assertion fails.
```suggestion
node, ok := val.(common.Node)
if !ok {
return ctx
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]