Copilot commented on code in PR #979:
URL: 
https://github.com/apache/skywalking-banyandb/pull/979#discussion_r2846183976


##########
pkg/meter/native/provider.go:
##########
@@ -53,55 +54,90 @@ type NodeInfo struct {
        HTTPAddress string
 }
 
+type pendingMeasure struct {
+       name   string
+       labels []string
+}
+
 type provider struct {
-       metadata metadata.Repo
-       scope    meter.Scope
-       nodeInfo NodeInfo
+       metadata        metadata.Repo
+       scope           meter.Scope
+       nodeInfo        NodeInfo
+       pendingMeasures []pendingMeasure
+       mu              sync.Mutex
+       initialized     atomic.Bool
 }
 
 // NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata 
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
-       p := &provider{
+func NewProvider(scope meter.Scope, metadata metadata.Repo, nodeInfo NodeInfo) 
meter.Provider {
+       return &provider{
                scope:    scope,
                metadata: metadata,
                nodeInfo: nodeInfo,
        }
-       err := p.createNativeObservabilityGroup(ctx)
-       if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
-               log.Error().Err(err).Msg("Failed to create native observability 
group")
+}
+
+// InitSchema creates the native observability group and all pending measures 
for the given provider.
+// It should be called after the metadata service is ready (during Serve 
phase).
+func InitSchema(ctx context.Context, p meter.Provider) {
+       np, ok := p.(*provider)
+       if !ok {
+               return
+       }
+       np.initialized.Store(true)
+       groupErr := np.createNativeObservabilityGroup(ctx)
+       if groupErr != nil && !errors.Is(groupErr, schema.ErrGRPCAlreadyExists) 
{
+               log.Error().Err(groupErr).Msg("Failed to create native 
observability group")
+       }

Review Comment:
   `InitSchema` sets `initialized` to true before 
`createNativeObservabilityGroup` and before flushing `pendingMeasures`. If a 
metric is registered concurrently after `initialized` flips, `registerOrDefer` 
may try to create measures before the observability group exists, causing 
avoidable failures. Consider setting `initialized` only after the group is 
created (and/or after pending measures are processed).



##########
scripts/build/lint.mk:
##########
@@ -22,7 +22,7 @@ include $(mk_dir)lint-bin.mk
 
 .PHONY: lint
 lint: $(LINTER) $(REVIVE) ## Run all linters
-       $(LINTER) run -v --config $(root_dir)/.golangci.yml --timeout 10m ./... 
&& \
+       $(LINTER) run -v --config $(root_dir)/.golangci.yml --timeout 10m ./... 
--fix && \
          $(REVIVE) -config $(root_dir)/revive.toml -formatter friendly ./...

Review Comment:
   In the `lint` target, `--fix` is placed after the package arg (`./...`). 
`golangci-lint run` expects flags before package patterns, so this can be 
interpreted as an extra package and break the target. Consider moving `--fix` 
before `./...` (similar to the `format` target).



##########
banyand/metadata/schema/property/client.go:
##########
@@ -0,0 +1,1300 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "go.uber.org/multierr"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/api/validate"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       pkgtls "github.com/apache/skywalking-banyandb/pkg/tls"
+)
+
+const (
+       // DefaultGRPCTimeout is the default timeout for gRPC calls to schema 
servers.
+       DefaultGRPCTimeout = 5 * time.Second
+       // DefaultSyncInterval is the default polling interval for 
property-based schema sync.
+       DefaultSyncInterval = 30 * time.Second
+       // DefaultInitWaitTime is the default maximum time to wait for at least 
one schema server to become active during initialization.
+       DefaultInitWaitTime = time.Minute
+)
+
+var _ schema.Registry = (*SchemaRegistry)(nil)
+
+var errNoActiveServers = fmt.Errorf("no active schema servers available")
+
+// ClientConfig holds configuration for the property-based schema registry 
client.
+type ClientConfig struct {
+       OMR                observability.MetricsRegistry
+       NodeRegistry       schema.Node
+       CurNode            *databasev1.Node
+       CACertPath         string
+       GRPCTimeout        time.Duration
+       SyncInterval       time.Duration
+       InitWaitTime       time.Duration
+       FullReconcileEvery uint64
+       MaxRecvMsgSize     int
+       TLSEnabled         bool
+}
+
+// SchemaRegistry implements schema.Registry using property-based schema 
servers.
+type SchemaRegistry struct {
+       connMgr            *grpchelper.ConnManager[*schemaClient]
+       closer             *run.Closer
+       l                  *logger.Logger
+       cache              *schemaCache
+       caCertReloader     *pkgtls.Reloader
+       handlers           map[schema.Kind][]schema.EventHandler
+       timeout            time.Duration
+       syncInterval       time.Duration
+       fullReconcileEvery uint64
+       syncRound          uint64
+       mux                sync.RWMutex
+}
+
+// NewSchemaRegistryClient creates a new property-based schema registry client.
+func NewSchemaRegistryClient(cfg *ClientConfig) (*SchemaRegistry, error) {
+       l := logger.GetLogger("property-schema-registry")
+       var caCertReloader *pkgtls.Reloader
+       if cfg.TLSEnabled && cfg.CACertPath != "" {
+               var reloaderErr error
+               caCertReloader, reloaderErr = 
pkgtls.NewClientCertReloader(cfg.CACertPath, l)
+               if reloaderErr != nil {
+                       return nil, fmt.Errorf("failed to initialize CA 
certificate reloader: %w", reloaderErr)
+               }
+       }
+       handler := &connectionHandler{
+               l:              l,
+               caCertReloader: caCertReloader,
+               tlsEnabled:     cfg.TLSEnabled,
+       }
+       connMgr := 
grpchelper.NewConnManager[*schemaClient](grpchelper.ConnManagerConfig[*schemaClient]{
+               Handler:        handler,
+               Logger:         l,
+               MaxRecvMsgSize: cfg.MaxRecvMsgSize,
+       })
+       timeout := cfg.GRPCTimeout
+       if timeout == 0 {
+               timeout = DefaultGRPCTimeout
+       }
+       syncInterval := cfg.SyncInterval
+       if syncInterval == 0 {
+               syncInterval = DefaultSyncInterval
+       }
+       initWaitTime := cfg.InitWaitTime
+       if initWaitTime == 0 {
+               initWaitTime = DefaultInitWaitTime
+       }
+       fullReconcileEvery := cfg.FullReconcileEvery
+       if fullReconcileEvery == 0 {
+               fullReconcileEvery = 5
+       }
+       reg := &SchemaRegistry{
+               connMgr:            connMgr,
+               closer:             run.NewCloser(1),
+               l:                  l,
+               cache:              newSchemaCache(),
+               caCertReloader:     caCertReloader,
+               handlers:           make(map[schema.Kind][]schema.EventHandler),
+               timeout:            timeout,
+               syncInterval:       syncInterval,
+               fullReconcileEvery: fullReconcileEvery,
+       }
+
+       if cfg.CurNode != nil && isPropertySchemaNode(cfg.CurNode) {
+               connMgr.OnAddOrUpdate(cfg.CurNode)
+       } else if cfg.NodeRegistry != nil {
+               var nodesAdded bool
+               nodes, listErr := 
cfg.NodeRegistry.ListNode(context.Background(), databasev1.Role_ROLE_META)
+               if listErr != nil {
+                       return nil, fmt.Errorf("failed to list meta nodes: %w", 
listErr)
+               }
+               for _, node := range nodes {
+                       if isPropertySchemaNode(node) {
+                               connMgr.OnAddOrUpdate(node)
+                               nodesAdded = true
+                       }
+               }
+               if !nodesAdded {
+                       _ = reg.Close()
+                       return nil, fmt.Errorf("no property schema nodes found 
among %d meta nodes", len(nodes))
+               }
+       }
+
+       // Wait for at least one schema server to become active.
+       // OnAddOrUpdate health-checks synchronously; if it fails, a background
+       // retry goroutine retries backoff. Poll until active or timeout.
+       if connMgr.ActiveCount() == 0 {
+               waitDeadline := time.Now().Add(initWaitTime)
+               for connMgr.ActiveCount() == 0 && 
time.Now().Before(waitDeadline) {
+                       time.Sleep(500 * time.Millisecond)
+               }
+               if connMgr.ActiveCount() == 0 {
+                       _ = reg.Close()
+                       return nil, fmt.Errorf("no schema servers reachable 
after %s", timeout)

Review Comment:
   This initialization failure message uses `timeout` (the gRPC timeout) 
instead of the actual wait deadline (`initWaitTime`) used in the loop. This can 
be misleading when diagnosing startup issues; consider reporting `initWaitTime` 
(or the elapsed time) here.
   ```suggestion
                        return nil, fmt.Errorf("no schema servers reachable 
after %s", initWaitTime)
   ```



##########
banyand/metadata/service/server.go:
##########
@@ -77,6 +81,8 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.StringSliceVar(&s.listenClientURL, "etcd-listen-client-url", 
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
        fs.StringSliceVar(&s.listenPeerURL, "etcd-listen-peer-url", 
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
        fs.VarP(&s.quotaBackendBytes, "etcd-quota-backend-bytes", "", "Quota 
for backend storage")
+       fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode", 
metadata.NodeDiscoveryModeEtcd,
+               "Node discovery mode: 'etcd' for etcd-based, 'dns' for 
DNS-based, 'file' for file-based")
        if s.propServer != nil {

Review Comment:
   `FlagSet()` defines `--node-discovery-mode` on this server FlagSet, but the 
embedded `s.Service.FlagSet()` (metadata client) also defines the same flag. 
Adding both flagsets will likely cause a duplicate-flag error/panic at startup. 
Consider removing this flag from the outer server FlagSet and relying on the 
embedded service flag (or rename one of them).



##########
pkg/meter/native/provider.go:
##########
@@ -53,55 +54,90 @@ type NodeInfo struct {
        HTTPAddress string
 }
 
+type pendingMeasure struct {
+       name   string
+       labels []string
+}
+
 type provider struct {
-       metadata metadata.Repo
-       scope    meter.Scope
-       nodeInfo NodeInfo
+       metadata        metadata.Repo
+       scope           meter.Scope
+       nodeInfo        NodeInfo
+       pendingMeasures []pendingMeasure
+       mu              sync.Mutex
+       initialized     atomic.Bool
 }
 
 // NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata 
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
-       p := &provider{
+func NewProvider(scope meter.Scope, metadata metadata.Repo, nodeInfo NodeInfo) 
meter.Provider {
+       return &provider{
                scope:    scope,
                metadata: metadata,
                nodeInfo: nodeInfo,
        }
-       err := p.createNativeObservabilityGroup(ctx)
-       if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
-               log.Error().Err(err).Msg("Failed to create native observability 
group")
+}
+
+// InitSchema creates the native observability group and all pending measures 
for the given provider.
+// It should be called after the metadata service is ready (during Serve 
phase).
+func InitSchema(ctx context.Context, p meter.Provider) {
+       np, ok := p.(*provider)
+       if !ok {
+               return
+       }
+       np.initialized.Store(true)
+       groupErr := np.createNativeObservabilityGroup(ctx)
+       if groupErr != nil && !errors.Is(groupErr, schema.ErrGRPCAlreadyExists) 
{
+               log.Error().Err(groupErr).Msg("Failed to create native 
observability group")
+       }
+       np.mu.Lock()
+       pending := np.pendingMeasures
+       np.pendingMeasures = nil
+       np.mu.Unlock()
+       for _, pm := range pending {
+               _, measureErr := np.createMeasure(ctx, pm.name, pm.labels...)
+               if measureErr != nil && !errors.Is(measureErr, 
schema.ErrGRPCAlreadyExists) {
+                       log.Error().Err(measureErr).Msgf("Failed to create 
measure %s", pm.name)
+               }
        }
-       return p
 }
 
 // Counter returns a native implementation of the Counter interface.
 func (p *provider) Counter(name string, labelNames ...string) meter.Counter {
-       name, err := p.createMeasure(name, labelNames...)
-       if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
-               log.Error().Err(err).Msgf("Failure to createMeasure for Counter 
%s, labels: %v", name, labelNames)
-       }
+       p.registerOrDefer(name, labelNames)
        return &Counter{
                newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
-// Gauge returns a nativeimplementation of the Gauge interface.
+// Gauge returns a native implementation of the Gauge interface.
 func (p *provider) Gauge(name string, labelNames ...string) meter.Gauge {
-       name, err := p.createMeasure(name, labelNames...)
-       if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
-               log.Error().Err(err).Msgf("Failure to createMeasure for Gauge 
%s, labels: %v", name, labelNames)
-       }
+       p.registerOrDefer(name, labelNames)
        return &Gauge{
                newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
 // Histogram returns a native implementation of the Histogram interface.
-func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string) 
meter.Histogram {
+func (p *provider) Histogram(name string, _ meter.Buckets, labelNames 
...string) meter.Histogram {
+       p.registerOrDefer(name, labelNames)
        return &Histogram{
                newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
+func (p *provider) registerOrDefer(name string, labels []string) {
+       if p.initialized.Load() {
+               _, measureErr := p.createMeasure(context.Background(), name, 
labels...)
+               if measureErr != nil && !errors.Is(measureErr, 
schema.ErrGRPCAlreadyExists) {
+                       log.Error().Err(measureErr).Msgf("Failed to create 
measure %s", name)
+               }
+               return
+       }
+       p.mu.Lock()
+       p.pendingMeasures = append(p.pendingMeasures, pendingMeasure{name: 
name, labels: labels})
+       p.mu.Unlock()

Review Comment:
   `registerOrDefer` calls `createMeasure(context.Background(), ...)` without 
any timeout/cancellation. This can block the caller (metric creation path) 
indefinitely if the metadata service is slow/unavailable. Previously this path 
used a timeout; consider wrapping the background context with a bounded timeout 
(or reusing a configured timeout) and also copying `labels` when enqueuing to 
avoid retaining/mutating the caller’s slice.



##########
banyand/observability/services/meter_native.go:
##########
@@ -19,20 +19,32 @@ package services
 
 import (
        "context"
-       "time"
+       "sync"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/meter/native"
 )
 
 type nativeProviderFactory struct {
-       metadata metadata.Repo
-       nodeInfo native.NodeInfo
+       metadata  metadata.Repo
+       nodeInfo  native.NodeInfo
+       providers []meter.Provider
+       mu        sync.Mutex
 }
 
-func (f nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
-       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-       defer cancel()
-       return native.NewProvider(ctx, scope, f.metadata, f.nodeInfo)
+func (f *nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
+       p := native.NewProvider(scope, f.metadata, f.nodeInfo)
+       f.mu.Lock()
+       f.providers = append(f.providers, p)
+       f.mu.Unlock()
+       return p
+}
+
+func (f *nativeProviderFactory) initAllSchemas(ctx context.Context) {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       for _, p := range f.providers {

Review Comment:
   `initAllSchemas` holds `f.mu` while calling `native.InitSchema` for each 
provider. `InitSchema` performs RPCs and can block; keeping the factory mutex 
held that long can stall concurrent provider creation and risks lock 
contention. Consider snapshotting `f.providers` under the lock, releasing it, 
then iterating/calling `InitSchema` without holding the mutex.
   ```suggestion
        providers := append([]meter.Provider(nil), f.providers...)
        f.mu.Unlock()
        for _, p := range providers {
   ```



##########
banyand/metadata/service/server.go:
##########
@@ -194,6 +212,30 @@ func NewService(_ context.Context) (metadata.Service, 
error) {
        return s, nil
 }
 
+func (s *server) enrichContextWithSchemaAddress(ctx context.Context) 
context.Context {
+       port := s.propServer.GetPort()
+       if port == nil {
+               return ctx
+       }
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return ctx
+       }
+       node := val.(common.Node)

Review Comment:
   `enrichContextWithSchemaAddress` does a direct type assertion 
`val.(common.Node)` which will panic if the context value isn’t the expected 
type. Since this function is on the startup path, consider using the `node, ok 
:= val.(common.Node)` form and returning `ctx` when the assertion fails.
   ```suggestion
        node, ok := val.(common.Node)
        if !ok {
                return ctx
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to