This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a81f77b [GsoC][BanyanDB] Self-Observability: Write Metric Data to 
Measure In Liaison and Data mode (#475)
3a81f77b is described below

commit 3a81f77b47c13b9715fefbe09f6f99d3575c050f
Author: Sylvie-Wxr <129717259+sylvie-...@users.noreply.github.com>
AuthorDate: Wed Jun 26 17:51:54 2024 -0700

    [GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In 
Liaison and Data mode (#475)
    
    * add new local pipeline to units
    
    * add nodeSelector for liaison node
    
    ---------
    
    Co-authored-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/measure/measure_suite_test.go |  2 +-
 banyand/measure/service.go            | 32 ++++++++++++++++--------
 banyand/observability/meter_native.go |  4 +--
 banyand/observability/service.go      | 47 ++++++++++++++++++++++++-----------
 pkg/cmdsetup/data.go                  |  7 ++++--
 pkg/cmdsetup/liaison.go               |  5 ++--
 pkg/cmdsetup/standalone.go            |  4 +--
 pkg/meter/native/collection.go        | 26 +++++++++++++++----
 pkg/meter/native/instruments.go       |  2 +-
 pkg/meter/native/provider.go          | 36 ++++++++++++++++++++-------
 pkg/meter/native/vec.go               |  8 +++---
 11 files changed, 120 insertions(+), 53 deletions(-)

diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
index 8765c2b2..c7e8b224 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -77,7 +77,7 @@ func setUp() (*services, func()) {
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        // Init Measure Service
-       measureService, err := measure.NewService(context.TODO(), 
metadataService, pipeline)
+       measureService, err := measure.NewService(context.TODO(), 
metadataService, pipeline, nil)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
        var flags []string
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 7b7ab71c..85aeb1ed 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -53,14 +53,15 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
-       schemaRepo    *schemaRepo
-       writeListener bus.MessageListener
-       metadata      metadata.Repo
-       pipeline      queue.Server
-       localPipeline queue.Queue
-       option        option
-       l             *logger.Logger
-       root          string
+       schemaRepo     *schemaRepo
+       writeListener  bus.MessageListener
+       metadata       metadata.Repo
+       pipeline       queue.Server
+       localPipeline  queue.Queue
+       metricPipeline queue.Server
+       option         option
+       l              *logger.Logger
+       root           string
 }
 
 func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -108,6 +109,14 @@ func (s *service) PreRun(_ context.Context) error {
        // run a serial watcher
 
        s.writeListener = setUpWriteCallback(s.l, s.schemaRepo)
+       // only subscribe metricPipeline for data node
+       if s.metricPipeline != nil {
+               err := s.metricPipeline.Subscribe(data.TopicMeasureWrite, 
s.writeListener)
+               if err != nil {
+                       s.l.Err(err).Msgf("Fail to subscribe metricPipeline, 
%v", err)
+                       return err
+               }
+       }
        err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
        if err != nil {
                return err
@@ -125,9 +134,10 @@ func (s *service) GracefulStop() {
 }
 
 // NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, pipeline 
queue.Server) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline 
queue.Server, metricPipeline queue.Server) (Service, error) {
        return &service{
-               metadata: metadata,
-               pipeline: pipeline,
+               metadata:       metadata,
+               pipeline:       pipeline,
+               metricPipeline: metricPipeline,
        }, nil
 }
diff --git a/banyand/observability/meter_native.go 
b/banyand/observability/meter_native.go
index 0f86da94..e239128d 100644
--- a/banyand/observability/meter_native.go
+++ b/banyand/observability/meter_native.go
@@ -35,8 +35,8 @@ var (
 )
 
 // NewMeterProvider returns a meter.Provider based on the given scope.
-func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo) 
meter.Provider {
-       return native.NewProvider(ctx, SystemScope, metadata)
+func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo, 
nodeInfo native.NodeInfo) meter.Provider {
+       return native.NewProvider(ctx, SystemScope, metadata, nodeInfo)
 }
 
 // MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a 
grpc.StreamServerInterceptor.
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index a0b8baf2..8e41c5ed 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -19,6 +19,7 @@ package observability
 
 import (
        "context"
+       "errors"
        "net/http"
        "sync"
        "time"
@@ -26,6 +27,7 @@ import (
        "github.com/robfig/cron/v3"
        "google.golang.org/grpc"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -54,24 +56,28 @@ type Service interface {
 }
 
 // NewMetricService returns a metric service.
-func NewMetricService(metadata metadata.Repo, pipeline queue.Client) Service {
+func NewMetricService(metadata metadata.Repo, pipeline queue.Client, nodeType 
string, nodeSelector native.NodeSelector) Service {
        return &metricService{
-               closer:   run.NewCloser(1),
-               metadata: metadata,
-               pipeline: pipeline,
+               closer:       run.NewCloser(1),
+               metadata:     metadata,
+               pipeline:     pipeline,
+               nodeType:     nodeType,
+               nodeSelector: nodeSelector,
        }
 }
 
 type metricService struct {
-       l          *logger.Logger
-       svr        *http.Server
-       closer     *run.Closer
-       scheduler  *timestamp.Scheduler
-       metadata   metadata.Repo
-       pipeline   queue.Client
-       listenAddr string
-       modes      []string
-       mutex      sync.Mutex
+       l            *logger.Logger
+       svr          *http.Server
+       closer       *run.Closer
+       scheduler    *timestamp.Scheduler
+       metadata     metadata.Repo
+       pipeline     queue.Client
+       nodeSelector native.NodeSelector
+       listenAddr   string
+       nodeType     string
+       modes        []string
+       mutex        sync.Mutex
 }
 
 func (p *metricService) FlagSet() *run.FlagSet {
@@ -106,8 +112,19 @@ func (p *metricService) PreRun(ctx context.Context) error {
                MetricsServerInterceptor = promMetricsServerInterceptor
        }
        if containsMode(p.modes, flagNativeMode) {
-               NativeMetricCollection = native.NewMetricsCollection(p.pipeline)
-               NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata)
+               NativeMetricCollection = 
native.NewMetricsCollection(p.pipeline, p.nodeSelector)
+               val := ctx.Value(common.ContextNodeKey)
+               if val == nil {
+                       return errors.New("metric service native mode, node id 
is empty")
+               }
+               node := val.(common.Node)
+               nodeInfo := native.NodeInfo{
+                       Type:        p.nodeType,
+                       NodeID:      node.NodeID,
+                       GrpcAddress: node.GrpcAddress,
+                       HTTPAddress: node.HTTPAddress,
+               }
+               NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, 
nodeInfo)
        }
        initMetrics(p.modes)
        return nil
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 991804ab..21ab9b4a 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/query"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/queue/sub"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,11 +44,12 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
        pipeline := sub.NewServer()
+       localPipeline := queue.Local()
        streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
-       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
+       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, 
localPipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
@@ -57,12 +59,13 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc, nil)
+       metricSvc := observability.NewMetricService(metaSvc, localPipeline, 
"data", nil)
 
        var units []run.Unit
        units = append(units, runners...)
        units = append(units,
                metaSvc,
+               localPipeline,
                pipeline,
                measureSvc,
                streamSvc,
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 152ba7a0..a7112e94 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -50,9 +50,10 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate required node 
selector")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
grpc.NewClusterNodeRegistry(pipeline, nodeSel))
+       nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
+       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry)
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc, nil)
+       metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"liaison", nodeRegistry)
        httpServer := http.NewServer()
        dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
        if err != nil {
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index b98645d1..31b6daa7 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -49,7 +49,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
-       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
+       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
@@ -59,7 +59,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        }
        grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc, pipeline)
+       metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"standalone", nil)
        httpServer := http.NewServer()
 
        var units []run.Unit
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index 82630fa9..9904e11a 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -31,20 +31,27 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
+// NodeSelector has Locate method to select a nodeId.
+type NodeSelector interface {
+       Locate(group, name string, shardID uint32) (string, error)
+}
+
 type collector interface {
        Collect() (string, []metricWithLabelValues)
 }
 
 // MetricCollection contains all the native implementations of metrics.
 type MetricCollection struct {
-       pipeline   queue.Client
-       collectors []collector
+       pipeline     queue.Client
+       nodeSelector NodeSelector
+       collectors   []collector
 }
 
 // NewMetricsCollection creates a new MetricCollection.
-func NewMetricsCollection(pipeline queue.Client) MetricCollection {
+func NewMetricsCollection(pipeline queue.Client, nodeSelector NodeSelector) 
MetricCollection {
        return MetricCollection{
-               pipeline: pipeline,
+               pipeline:     pipeline,
+               nodeSelector: nodeSelector,
        }
 }
 
@@ -65,7 +72,16 @@ func (m *MetricCollection) FlushMetrics() {
                name, metrics := collector.Collect()
                for _, metric := range metrics {
                        iwr := m.buildIWR(name, metric)
-                       messages = append(messages, 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", iwr))
+                       nodeID := ""
+                       var err error
+                       // only liaison node has a non-nil nodeSelector
+                       if m.nodeSelector != nil {
+                               nodeID, err = 
m.nodeSelector.Locate(iwr.GetRequest().GetMetadata().GetGroup(), 
iwr.GetRequest().GetMetadata().GetName(), uint32(0))
+                               if err != nil {
+                                       log.Error().Err(err).Msg("Failed to 
locate nodeID")
+                               }
+                       }
+                       messages = append(messages, 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr))
                }
        }
        _, err := publisher.Publish(data.TopicMeasureWrite, messages...)
diff --git a/pkg/meter/native/instruments.go b/pkg/meter/native/instruments.go
index d281a9a4..fcbee215 100644
--- a/pkg/meter/native/instruments.go
+++ b/pkg/meter/native/instruments.go
@@ -37,7 +37,7 @@ func (g *Gauge) Add(delta float64, labelValues ...string) {
 func (g *Gauge) Set(value float64, labelValues ...string) {
        g.mutex.Lock()
        defer g.mutex.Unlock()
-       tagValues := buildTagValues(g.scope, labelValues...)
+       tagValues := buildTagValues(g.nodeInfo, g.scope, labelValues...)
        hash := seriesHash(tagValues)
        key := string(hash)
        g.metrics[key] = metricWithLabelValues{
diff --git a/pkg/meter/native/provider.go b/pkg/meter/native/provider.go
index 97c2747f..1ebc173b 100644
--- a/pkg/meter/native/provider.go
+++ b/pkg/meter/native/provider.go
@@ -36,22 +36,34 @@ const (
        NativeObservabilityGroupName = "_monitoring"
        defaultTagFamily             = "default"
        defaultFieldName             = "value"
-       nodeNameTag                  = "node_name"
-       standaloneNodeName           = "standalone"
+       tagNodeType                  = "node_type"
+       tagNodeID                    = "node_id"
+       tagGRPCAddress               = "grpc_address"
+       tagHTTPAddress               = "http_address"
 )
 
 var log = logger.GetLogger("observability", "metrics", "system")
 
+// NodeInfo is the struct that contains information used in native 
observability mode.
+type NodeInfo struct {
+       Type        string
+       NodeID      string
+       GrpcAddress string
+       HTTPAddress string
+}
+
 type provider struct {
        metadata metadata.Repo
        scope    meter.Scope
+       nodeInfo NodeInfo
 }
 
 // NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata 
metadata.Repo) meter.Provider {
+func NewProvider(ctx context.Context, scope meter.Scope, metadata 
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
        p := &provider{
                scope:    scope,
                metadata: metadata,
+               nodeInfo: nodeInfo,
        }
        err := p.createNativeObservabilityGroup(ctx)
        if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
@@ -67,7 +79,7 @@ func (p *provider) Counter(name string, labelNames ...string) 
meter.Counter {
                log.Error().Err(err).Msgf("Failure to createMeasure for Counter 
%s, labels: %v", name, labelNames)
        }
        return &Counter{
-               newMetricVec(name, p.scope),
+               newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
@@ -78,14 +90,14 @@ func (p *provider) Gauge(name string, labelNames ...string) 
meter.Gauge {
                log.Error().Err(err).Msgf("Failure to createMeasure for Gauge 
%s, labels: %v", name, labelNames)
        }
        return &Gauge{
-               newMetricVec(name, p.scope),
+               newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
 // Histogram returns a native implementation of the Histogram interface.
 func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string) 
meter.Histogram {
        return &Histogram{
-               newMetricVec(name, p.scope),
+               newMetricVec(name, p.scope, p.nodeInfo),
        }
 }
 
@@ -151,7 +163,10 @@ func buildTags(scope meter.Scope, labels []string) 
([]*databasev1.TagSpec, []str
                        entityTags = append(entityTags, label)
                }
        }
-       addTags(nodeNameTag)
+       addTags(tagNodeType)
+       addTags(tagNodeID)
+       addTags(tagGRPCAddress)
+       addTags(tagHTTPAddress)
        for label := range scope.GetLabels() {
                addTags(label)
        }
@@ -159,7 +174,7 @@ func buildTags(scope meter.Scope, labels []string) 
([]*databasev1.TagSpec, []str
        return tags, entityTags
 }
 
-func buildTagValues(scope meter.Scope, labelValues ...string) 
[]*modelv1.TagValue {
+func buildTagValues(nodeInfo NodeInfo, scope meter.Scope, labelValues 
...string) []*modelv1.TagValue {
        var tagValues []*modelv1.TagValue
        addTagValues := func(labelValues ...string) {
                for _, value := range labelValues {
@@ -173,7 +188,10 @@ func buildTagValues(scope meter.Scope, labelValues 
...string) []*modelv1.TagValu
                        tagValues = append(tagValues, tagValue)
                }
        }
-       addTagValues(standaloneNodeName)
+       addTagValues(nodeInfo.Type)
+       addTagValues(nodeInfo.NodeID)
+       addTagValues(nodeInfo.GrpcAddress)
+       addTagValues(nodeInfo.HTTPAddress)
        for _, labelValue := range scope.GetLabels() {
                addTagValues(labelValue)
        }
diff --git a/pkg/meter/native/vec.go b/pkg/meter/native/vec.go
index 3b93fcdb..3dd93f4f 100644
--- a/pkg/meter/native/vec.go
+++ b/pkg/meter/native/vec.go
@@ -38,14 +38,16 @@ type metricWithLabelValues struct {
 }
 
 type metricVec struct {
+       nodeInfo    NodeInfo
        scope       meter.Scope
        metrics     map[string]metricWithLabelValues
        measureName string
        mutex       sync.Mutex
 }
 
-func newMetricVec(measureName string, scope meter.Scope) *metricVec {
+func newMetricVec(measureName string, scope meter.Scope, nodeInfo NodeInfo) 
*metricVec {
        n := &metricVec{
+               nodeInfo:    nodeInfo,
                scope:       scope,
                measureName: measureName,
                metrics:     map[string]metricWithLabelValues{},
@@ -56,7 +58,7 @@ func newMetricVec(measureName string, scope meter.Scope) 
*metricVec {
 func (n *metricVec) Inc(delta float64, labelValues ...string) {
        n.mutex.Lock()
        defer n.mutex.Unlock()
-       tagValues := buildTagValues(n.scope, labelValues...)
+       tagValues := buildTagValues(n.nodeInfo, n.scope, labelValues...)
        hash := seriesHash(tagValues)
        key := string(hash)
        v, exist := n.metrics[key]
@@ -73,7 +75,7 @@ func (n *metricVec) Inc(delta float64, labelValues ...string) 
{
 func (n *metricVec) Delete(labelValues ...string) bool {
        n.mutex.Lock()
        defer n.mutex.Unlock()
-       key := string(seriesHash(buildTagValues(n.scope, labelValues...)))
+       key := string(seriesHash(buildTagValues(n.nodeInfo, n.scope, 
labelValues...)))
        delete(n.metrics, key)
        return true
 }

Reply via email to