This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 3a81f77b [GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In Liaison and Data mode (#475) 3a81f77b is described below commit 3a81f77b47c13b9715fefbe09f6f99d3575c050f Author: Sylvie-Wxr <129717259+sylvie-...@users.noreply.github.com> AuthorDate: Wed Jun 26 17:51:54 2024 -0700 [GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In Liaison and Data mode (#475) * add new local pipeline to units * add nodeSelector for liaison node --------- Co-authored-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/measure/measure_suite_test.go | 2 +- banyand/measure/service.go | 32 ++++++++++++++++-------- banyand/observability/meter_native.go | 4 +-- banyand/observability/service.go | 47 ++++++++++++++++++++++++----------- pkg/cmdsetup/data.go | 7 ++++-- pkg/cmdsetup/liaison.go | 5 ++-- pkg/cmdsetup/standalone.go | 4 +-- pkg/meter/native/collection.go | 26 +++++++++++++++---- pkg/meter/native/instruments.go | 2 +- pkg/meter/native/provider.go | 36 ++++++++++++++++++++------- pkg/meter/native/vec.go | 8 +++--- 11 files changed, 120 insertions(+), 53 deletions(-) diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index 8765c2b2..c7e8b224 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -77,7 +77,7 @@ func setUp() (*services, func()) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) // Init Measure Service - measureService, err := measure.NewService(context.TODO(), metadataService, pipeline) + measureService, err := measure.NewService(context.TODO(), metadataService, pipeline, nil) gomega.Expect(err).NotTo(gomega.HaveOccurred()) preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService} var flags []string diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 7b7ab71c..85aeb1ed 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -53,14 +53,15 @@ type Service interface { var _ Service = (*service)(nil) type service struct { - schemaRepo *schemaRepo - writeListener bus.MessageListener - metadata metadata.Repo - pipeline queue.Server - localPipeline queue.Queue - option option - l *logger.Logger - root string + schemaRepo *schemaRepo + writeListener bus.MessageListener + metadata metadata.Repo + pipeline queue.Server + localPipeline queue.Queue + metricPipeline queue.Server + option option + l *logger.Logger + root string } func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) { @@ -108,6 +109,14 @@ func (s *service) PreRun(_ context.Context) error { // run a serial watcher s.writeListener = setUpWriteCallback(s.l, s.schemaRepo) + // only subscribe metricPipeline for data node + if s.metricPipeline != nil { + err := s.metricPipeline.Subscribe(data.TopicMeasureWrite, s.writeListener) + if err != nil { + s.l.Err(err).Msgf("Fail to subscribe metricPipeline, %v", err) + return err + } + } err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener) if err != nil { return err @@ -125,9 +134,10 @@ func (s *service) GracefulStop() { } // NewService returns a new service. -func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Server) (Service, error) { +func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Server, metricPipeline queue.Server) (Service, error) { return &service{ - metadata: metadata, - pipeline: pipeline, + metadata: metadata, + pipeline: pipeline, + metricPipeline: metricPipeline, }, nil } diff --git a/banyand/observability/meter_native.go b/banyand/observability/meter_native.go index 0f86da94..e239128d 100644 --- a/banyand/observability/meter_native.go +++ b/banyand/observability/meter_native.go @@ -35,8 +35,8 @@ var ( ) // NewMeterProvider returns a meter.Provider based on the given scope. -func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo) meter.Provider { - return native.NewProvider(ctx, SystemScope, metadata) +func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo, nodeInfo native.NodeInfo) meter.Provider { + return native.NewProvider(ctx, SystemScope, metadata, nodeInfo) } // MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a grpc.StreamServerInterceptor. diff --git a/banyand/observability/service.go b/banyand/observability/service.go index a0b8baf2..8e41c5ed 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -19,6 +19,7 @@ package observability import ( "context" + "errors" "net/http" "sync" "time" @@ -26,6 +27,7 @@ import ( "github.com/robfig/cron/v3" "google.golang.org/grpc" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -54,24 +56,28 @@ type Service interface { } // NewMetricService returns a metric service. -func NewMetricService(metadata metadata.Repo, pipeline queue.Client) Service { +func NewMetricService(metadata metadata.Repo, pipeline queue.Client, nodeType string, nodeSelector native.NodeSelector) Service { return &metricService{ - closer: run.NewCloser(1), - metadata: metadata, - pipeline: pipeline, + closer: run.NewCloser(1), + metadata: metadata, + pipeline: pipeline, + nodeType: nodeType, + nodeSelector: nodeSelector, } } type metricService struct { - l *logger.Logger - svr *http.Server - closer *run.Closer - scheduler *timestamp.Scheduler - metadata metadata.Repo - pipeline queue.Client - listenAddr string - modes []string - mutex sync.Mutex + l *logger.Logger + svr *http.Server + closer *run.Closer + scheduler *timestamp.Scheduler + metadata metadata.Repo + pipeline queue.Client + nodeSelector native.NodeSelector + listenAddr string + nodeType string + modes []string + mutex sync.Mutex } func (p *metricService) FlagSet() *run.FlagSet { @@ -106,8 +112,19 @@ func (p *metricService) PreRun(ctx context.Context) error { MetricsServerInterceptor = promMetricsServerInterceptor } if containsMode(p.modes, flagNativeMode) { - NativeMetricCollection = native.NewMetricsCollection(p.pipeline) - NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata) + NativeMetricCollection = native.NewMetricsCollection(p.pipeline, p.nodeSelector) + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("metric service native mode, node id is empty") + } + node := val.(common.Node) + nodeInfo := native.NodeInfo{ + Type: p.nodeType, + NodeID: node.NodeID, + GrpcAddress: node.GrpcAddress, + HTTPAddress: node.HTTPAddress, + } + NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata, nodeInfo) } initMetrics(p.modes) return nil diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index 991804ab..21ab9b4a 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/query" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/sub" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -43,11 +44,12 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate metadata service") } pipeline := sub.NewServer() + localPipeline := queue.Local() streamSvc, err := stream.NewService(ctx, metaSvc, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } - measureSvc, err := measure.NewService(ctx, metaSvc, pipeline) + measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, localPipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } @@ -57,12 +59,13 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate query processor") } profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService(metaSvc, nil) + metricSvc := observability.NewMetricService(metaSvc, localPipeline, "data", nil) var units []run.Unit units = append(units, runners...) units = append(units, metaSvc, + localPipeline, pipeline, measureSvc, streamSvc, diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index 152ba7a0..a7112e94 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -50,9 +50,10 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate required node selector") } - grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, grpc.NewClusterNodeRegistry(pipeline, nodeSel)) + nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel) + grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, nodeRegistry) profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService(metaSvc, nil) + metricSvc := observability.NewMetricService(metaSvc, pipeline, "liaison", nodeRegistry) httpServer := http.NewServer() dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline) if err != nil { diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index b98645d1..31b6daa7 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -49,7 +49,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate stream service") } - measureSvc, err := measure.NewService(ctx, metaSvc, pipeline) + measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil) if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } @@ -59,7 +59,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { } grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService(metaSvc, pipeline) + metricSvc := observability.NewMetricService(metaSvc, pipeline, "standalone", nil) httpServer := http.NewServer() var units []run.Unit diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go index 82630fa9..9904e11a 100644 --- a/pkg/meter/native/collection.go +++ b/pkg/meter/native/collection.go @@ -31,20 +31,27 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bus" ) +// NodeSelector has Locate method to select a nodeId. +type NodeSelector interface { + Locate(group, name string, shardID uint32) (string, error) +} + type collector interface { Collect() (string, []metricWithLabelValues) } // MetricCollection contains all the native implementations of metrics. type MetricCollection struct { - pipeline queue.Client - collectors []collector + pipeline queue.Client + nodeSelector NodeSelector + collectors []collector } // NewMetricsCollection creates a new MetricCollection. -func NewMetricsCollection(pipeline queue.Client) MetricCollection { +func NewMetricsCollection(pipeline queue.Client, nodeSelector NodeSelector) MetricCollection { return MetricCollection{ - pipeline: pipeline, + pipeline: pipeline, + nodeSelector: nodeSelector, } } @@ -65,7 +72,16 @@ func (m *MetricCollection) FlushMetrics() { name, metrics := collector.Collect() for _, metric := range metrics { iwr := m.buildIWR(name, metric) - messages = append(messages, bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", iwr)) + nodeID := "" + var err error + // only liaison node has a non-nil nodeSelector + if m.nodeSelector != nil { + nodeID, err = m.nodeSelector.Locate(iwr.GetRequest().GetMetadata().GetGroup(), iwr.GetRequest().GetMetadata().GetName(), uint32(0)) + if err != nil { + log.Error().Err(err).Msg("Failed to locate nodeID") + } + } + messages = append(messages, bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)) } } _, err := publisher.Publish(data.TopicMeasureWrite, messages...) diff --git a/pkg/meter/native/instruments.go b/pkg/meter/native/instruments.go index d281a9a4..fcbee215 100644 --- a/pkg/meter/native/instruments.go +++ b/pkg/meter/native/instruments.go @@ -37,7 +37,7 @@ func (g *Gauge) Add(delta float64, labelValues ...string) { func (g *Gauge) Set(value float64, labelValues ...string) { g.mutex.Lock() defer g.mutex.Unlock() - tagValues := buildTagValues(g.scope, labelValues...) + tagValues := buildTagValues(g.nodeInfo, g.scope, labelValues...) hash := seriesHash(tagValues) key := string(hash) g.metrics[key] = metricWithLabelValues{ diff --git a/pkg/meter/native/provider.go b/pkg/meter/native/provider.go index 97c2747f..1ebc173b 100644 --- a/pkg/meter/native/provider.go +++ b/pkg/meter/native/provider.go @@ -36,22 +36,34 @@ const ( NativeObservabilityGroupName = "_monitoring" defaultTagFamily = "default" defaultFieldName = "value" - nodeNameTag = "node_name" - standaloneNodeName = "standalone" + tagNodeType = "node_type" + tagNodeID = "node_id" + tagGRPCAddress = "grpc_address" + tagHTTPAddress = "http_address" ) var log = logger.GetLogger("observability", "metrics", "system") +// NodeInfo is the struct that contains information used in native observability mode. +type NodeInfo struct { + Type string + NodeID string + GrpcAddress string + HTTPAddress string +} + type provider struct { metadata metadata.Repo scope meter.Scope + nodeInfo NodeInfo } // NewProvider returns a native metrics Provider. -func NewProvider(ctx context.Context, scope meter.Scope, metadata metadata.Repo) meter.Provider { +func NewProvider(ctx context.Context, scope meter.Scope, metadata metadata.Repo, nodeInfo NodeInfo) meter.Provider { p := &provider{ scope: scope, metadata: metadata, + nodeInfo: nodeInfo, } err := p.createNativeObservabilityGroup(ctx) if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) { @@ -67,7 +79,7 @@ func (p *provider) Counter(name string, labelNames ...string) meter.Counter { log.Error().Err(err).Msgf("Failure to createMeasure for Counter %s, labels: %v", name, labelNames) } return &Counter{ - newMetricVec(name, p.scope), + newMetricVec(name, p.scope, p.nodeInfo), } } @@ -78,14 +90,14 @@ func (p *provider) Gauge(name string, labelNames ...string) meter.Gauge { log.Error().Err(err).Msgf("Failure to createMeasure for Gauge %s, labels: %v", name, labelNames) } return &Gauge{ - newMetricVec(name, p.scope), + newMetricVec(name, p.scope, p.nodeInfo), } } // Histogram returns a native implementation of the Histogram interface. func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string) meter.Histogram { return &Histogram{ - newMetricVec(name, p.scope), + newMetricVec(name, p.scope, p.nodeInfo), } } @@ -151,7 +163,10 @@ func buildTags(scope meter.Scope, labels []string) ([]*databasev1.TagSpec, []str entityTags = append(entityTags, label) } } - addTags(nodeNameTag) + addTags(tagNodeType) + addTags(tagNodeID) + addTags(tagGRPCAddress) + addTags(tagHTTPAddress) for label := range scope.GetLabels() { addTags(label) } @@ -159,7 +174,7 @@ func buildTags(scope meter.Scope, labels []string) ([]*databasev1.TagSpec, []str return tags, entityTags } -func buildTagValues(scope meter.Scope, labelValues ...string) []*modelv1.TagValue { +func buildTagValues(nodeInfo NodeInfo, scope meter.Scope, labelValues ...string) []*modelv1.TagValue { var tagValues []*modelv1.TagValue addTagValues := func(labelValues ...string) { for _, value := range labelValues { @@ -173,7 +188,10 @@ func buildTagValues(scope meter.Scope, labelValues ...string) []*modelv1.TagValu tagValues = append(tagValues, tagValue) } } - addTagValues(standaloneNodeName) + addTagValues(nodeInfo.Type) + addTagValues(nodeInfo.NodeID) + addTagValues(nodeInfo.GrpcAddress) + addTagValues(nodeInfo.HTTPAddress) for _, labelValue := range scope.GetLabels() { addTagValues(labelValue) } diff --git a/pkg/meter/native/vec.go b/pkg/meter/native/vec.go index 3b93fcdb..3dd93f4f 100644 --- a/pkg/meter/native/vec.go +++ b/pkg/meter/native/vec.go @@ -38,14 +38,16 @@ type metricWithLabelValues struct { } type metricVec struct { + nodeInfo NodeInfo scope meter.Scope metrics map[string]metricWithLabelValues measureName string mutex sync.Mutex } -func newMetricVec(measureName string, scope meter.Scope) *metricVec { +func newMetricVec(measureName string, scope meter.Scope, nodeInfo NodeInfo) *metricVec { n := &metricVec{ + nodeInfo: nodeInfo, scope: scope, measureName: measureName, metrics: map[string]metricWithLabelValues{}, @@ -56,7 +58,7 @@ func newMetricVec(measureName string, scope meter.Scope) *metricVec { func (n *metricVec) Inc(delta float64, labelValues ...string) { n.mutex.Lock() defer n.mutex.Unlock() - tagValues := buildTagValues(n.scope, labelValues...) + tagValues := buildTagValues(n.nodeInfo, n.scope, labelValues...) hash := seriesHash(tagValues) key := string(hash) v, exist := n.metrics[key] @@ -73,7 +75,7 @@ func (n *metricVec) Inc(delta float64, labelValues ...string) { func (n *metricVec) Delete(labelValues ...string) bool { n.mutex.Lock() defer n.mutex.Unlock() - key := string(seriesHash(buildTagValues(n.scope, labelValues...))) + key := string(seriesHash(buildTagValues(n.nodeInfo, n.scope, labelValues...))) delete(n.metrics, key) return true }