This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch liaison-topology in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7982f866310251e8f279225906057b8dcea7dc11 Author: Megrez Lu <lujiajing1...@gmail.com> AuthorDate: Wed Sep 6 14:35:10 2023 +0800 setup selector --- banyand/internal/cmd/liaison.go | 2 +- banyand/internal/cmd/standalone.go | 2 +- banyand/liaison/grpc/discovery.go | 4 +- banyand/liaison/grpc/measure.go | 9 +++- banyand/liaison/grpc/node.go | 84 ++++++++++++++++++++++++++++++++++ banyand/liaison/grpc/registry_test.go | 2 +- banyand/liaison/grpc/server.go | 6 +-- banyand/liaison/grpc/stream.go | 8 +++- pkg/node/interface.go | 86 +++++++++++++++++++++++++++++++++++ pkg/test/setup/setup.go | 2 +- 10 files changed, 194 insertions(+), 11 deletions(-) diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go index 0b842201..0a8ad2cc 100644 --- a/banyand/internal/cmd/liaison.go +++ b/banyand/internal/cmd/liaison.go @@ -45,7 +45,7 @@ func newLiaisonCmd() *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate metadata service") } pipeline := pub.New(metaSvc) - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewClusterNodeRegistry(metaSvc)) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 8686db07..16a91192 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -60,7 +60,7 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index e2fb24c9..b10e16f7 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -41,6 +41,7 @@ var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { pipeline queue.Client + nodeRegistry NodeRegistry metadataRepo metadata.Repo shardRepo *shardRepo entityRepo *entityRepo @@ -48,10 +49,11 @@ type discoveryService struct { kind schema.Kind } -func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo metadata.Repo) *discoveryService { +func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { sr := &shardRepo{shardEventsMap: make(map[identity]uint32)} er := &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)} return &discoveryService{ + nodeRegistry: nodeRegistry, shardRepo: sr, entityRepo: er, pipeline: pipeline, diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 8a541147..38deab8c 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -101,8 +101,13 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er SeriesHash: tsdb.HashEntity(entity), EntityValues: tagValues.Encode(), } - // TODO: set node id - message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "todo", iwr) + nodeId, errPickNode := ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), writeRequest.GetMetadata().GetName(), uint32(shardID)) + if errPickNode != nil { + ms.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to pick an available node") + reply(measure, ms.sampled) + continue + } + message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeId, iwr) _, errWritePub := publisher.Publish(data.TopicMeasureWrite, message) if errWritePub != nil { ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message") diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go new file mode 100644 index 00000000..dff0ca22 --- /dev/null +++ b/banyand/liaison/grpc/node.go @@ -0,0 +1,84 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/node" +) + +var ( + _ schema.EventHandler = (*clusterNodeService)(nil) + _ NodeRegistry = (*clusterNodeService)(nil) +) + +type NodeRegistry interface { + Locate(group, name string, shardId uint32) (string, error) +} + +type clusterNodeService struct { + metaRepo metadata.Repo + sel node.Selector +} + +func NewClusterNodeRegistry(metaRepo metadata.Repo) NodeRegistry { + cns := &clusterNodeService{ + metaRepo: metaRepo, + sel: node.NewPickFirstSelector(), + } + cns.metaRepo.RegisterHandler("cluster-node-service", schema.KindNode, cns) + return cns +} + +func (n *clusterNodeService) Locate(group, name string, shardId uint32) (string, error) { + nodeId, err := n.sel.Pick(group, name, shardId) + if err != nil { + return "", errors.Wrapf(err, "fail to locate %s/%s(%d)", group, name, shardId) + } + return nodeId, nil +} + +func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) { + switch metadata.Kind { + case schema.KindNode: + n.sel.AddNode(metadata.Name) + default: + } +} + +func (n *clusterNodeService) OnDelete(metadata schema.Metadata) { + switch metadata.Kind { + case schema.KindNode: + n.sel.RemoveNode(metadata.Name) + default: + } +} + +type localNodeService struct{} + +func NewLocalNodeRegistry() NodeRegistry { + return localNodeService{} +} + +// Locate of localNodeService always returns +func (localNodeService) Locate(_group, _name string, _shardId uint32) (string, error) { + return "local", nil +} diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index 6c86d780..6dff21fa 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -178,7 +178,7 @@ func setupForRegistry() func() { metaSvc, err := metadata.NewService(context.TODO()) Expect(err).NotTo(HaveOccurred()) - tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) + tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, grpc.NewLocalNodeRegistry()) preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc} var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index cc7e0817..ba98016e 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -91,12 +91,12 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata.Repo) Server { +func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server { streamSVC := &streamService{ - discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry), + discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry, nodeRegistry), } measureSVC := &measureService{ - discoveryService: newDiscoveryService(pipeline, schema.KindMeasure, schemaRegistry), + discoveryService: newDiscoveryService(pipeline, schema.KindMeasure, schemaRegistry, nodeRegistry), } s := &server{ pipeline: pipeline, diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index e4cc1ed1..06b78112 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -103,7 +103,13 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { if s.log.Debug().Enabled() { iwr.EntityValues = tagValues.Encode() } - message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "TODO", iwr) + nodeId, errPickNode := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID)) + if errPickNode != nil { + s.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to pick an available node") + reply(stream, s.sampled) + continue + } + message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeId, iwr) _, errWritePub := publisher.Publish(data.TopicStreamWrite, message) if errWritePub != nil { s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message") diff --git a/pkg/node/interface.go b/pkg/node/interface.go new file mode 100644 index 00000000..a12c36e0 --- /dev/null +++ b/pkg/node/interface.go @@ -0,0 +1,86 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package node provides node selector for liaison +package node + +import ( + "sync" + + "github.com/pkg/errors" + "golang.org/x/exp/slices" +) + +var ( + _ Selector = (*pickFirstSelector)(nil) + + ErrNoAvailableNode = errors.New("selector: no available node") +) + +type Selector interface { + AddNode(nodeId string) + RemoveNode(nodeId string) + Pick(group, name string, shardId uint32) (string, error) +} + +func NewPickFirstSelector() Selector { + return &pickFirstSelector{} +} + +// pickFirstSelector always pick the first node in the sorted node ids list +type pickFirstSelector struct { + nodeIds []string + nodeIdMap map[string]struct{} + mu sync.RWMutex +} + +func (p *pickFirstSelector) AddNode(nodeId string) { + p.mu.RLock() + if _, ok := p.nodeIdMap[nodeId]; !ok { + p.mu.RUnlock() + return + } + p.mu.RUnlock() + p.mu.Lock() + defer p.mu.Unlock() + p.nodeIdMap[nodeId] = struct{}{} + p.nodeIds = append(p.nodeIds, nodeId) + slices.Sort(p.nodeIds) +} + +func (p *pickFirstSelector) RemoveNode(nodeId string) { + p.mu.RLock() + if _, ok := p.nodeIdMap[nodeId]; !ok { + p.mu.RUnlock() + return + } + p.mu.RUnlock() + p.mu.Lock() + defer p.mu.Unlock() + p.nodeIdMap[nodeId] = struct{}{} + p.nodeIds = append(p.nodeIds, nodeId) + slices.Sort(p.nodeIds) +} + +func (p *pickFirstSelector) Pick(group, name string, shardId uint32) (string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + if len(p.nodeIds) == 0 { + return "", ErrNoAvailableNode + } + return p.nodeIds[0], nil +} diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index e7fb94c7..281ffe8a 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -92,7 +92,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) func() { // Init `Query` module q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) + tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, grpc.NewLocalNodeRegistry()) httpServer := http.NewServer() units := []run.Unit{