This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch liaison-topology
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7982f866310251e8f279225906057b8dcea7dc11
Author: Megrez Lu <lujiajing1...@gmail.com>
AuthorDate: Wed Sep 6 14:35:10 2023 +0800

    setup selector
---
 banyand/internal/cmd/liaison.go       |  2 +-
 banyand/internal/cmd/standalone.go    |  2 +-
 banyand/liaison/grpc/discovery.go     |  4 +-
 banyand/liaison/grpc/measure.go       |  9 +++-
 banyand/liaison/grpc/node.go          | 84 ++++++++++++++++++++++++++++++++++
 banyand/liaison/grpc/registry_test.go |  2 +-
 banyand/liaison/grpc/server.go        |  6 +--
 banyand/liaison/grpc/stream.go        |  8 +++-
 pkg/node/interface.go                 | 86 +++++++++++++++++++++++++++++++++++
 pkg/test/setup/setup.go               |  2 +-
 10 files changed, 194 insertions(+), 11 deletions(-)

diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go
index 0b842201..0a8ad2cc 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/banyand/internal/cmd/liaison.go
@@ -45,7 +45,7 @@ func newLiaisonCmd() *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
        pipeline := pub.New(metaSvc)
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewClusterNodeRegistry(metaSvc))
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 8686db07..16a91192 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -60,7 +60,7 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index e2fb24c9..b10e16f7 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -41,6 +41,7 @@ var errNotExist = errors.New("the object doesn't exist")
 
 type discoveryService struct {
        pipeline     queue.Client
+       nodeRegistry NodeRegistry
        metadataRepo metadata.Repo
        shardRepo    *shardRepo
        entityRepo   *entityRepo
@@ -48,10 +49,11 @@ type discoveryService struct {
        kind         schema.Kind
 }
 
-func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo 
metadata.Repo) *discoveryService {
+func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegistry NodeRegistry) *discoveryService {
        sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
        er := &entityRepo{entitiesMap: 
make(map[identity]partition.EntityLocator)}
        return &discoveryService{
+               nodeRegistry: nodeRegistry,
                shardRepo:    sr,
                entityRepo:   er,
                pipeline:     pipeline,
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 8a541147..38deab8c 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -101,8 +101,13 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        SeriesHash:   tsdb.HashEntity(entity),
                        EntityValues: tagValues.Encode(),
                }
-               // TODO: set node id
-               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "todo", iwr)
+               nodeId, errPickNode := 
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), 
writeRequest.GetMetadata().GetName(), uint32(shardID))
+               if errPickNode != nil {
+                       ms.sampled.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to pick an available node")
+                       reply(measure, ms.sampled)
+                       continue
+               }
+               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeId, iwr)
                _, errWritePub := publisher.Publish(data.TopicMeasureWrite, 
message)
                if errWritePub != nil {
                        ms.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to send a message")
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
new file mode 100644
index 00000000..dff0ca22
--- /dev/null
+++ b/banyand/liaison/grpc/node.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/node"
+)
+
+var (
+       _ schema.EventHandler = (*clusterNodeService)(nil)
+       _ NodeRegistry        = (*clusterNodeService)(nil)
+)
+
+type NodeRegistry interface {
+       Locate(group, name string, shardId uint32) (string, error)
+}
+
+type clusterNodeService struct {
+       metaRepo metadata.Repo
+       sel      node.Selector
+}
+
+func NewClusterNodeRegistry(metaRepo metadata.Repo) NodeRegistry {
+       cns := &clusterNodeService{
+               metaRepo: metaRepo,
+               sel:      node.NewPickFirstSelector(),
+       }
+       cns.metaRepo.RegisterHandler("cluster-node-service", schema.KindNode, 
cns)
+       return cns
+}
+
+func (n *clusterNodeService) Locate(group, name string, shardId uint32) 
(string, error) {
+       nodeId, err := n.sel.Pick(group, name, shardId)
+       if err != nil {
+               return "", errors.Wrapf(err, "fail to locate %s/%s(%d)", group, 
name, shardId)
+       }
+       return nodeId, nil
+}
+
+func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) {
+       switch metadata.Kind {
+       case schema.KindNode:
+               n.sel.AddNode(metadata.Name)
+       default:
+       }
+}
+
+func (n *clusterNodeService) OnDelete(metadata schema.Metadata) {
+       switch metadata.Kind {
+       case schema.KindNode:
+               n.sel.RemoveNode(metadata.Name)
+       default:
+       }
+}
+
+type localNodeService struct{}
+
+func NewLocalNodeRegistry() NodeRegistry {
+       return localNodeService{}
+}
+
+// Locate of localNodeService always returns
+func (localNodeService) Locate(_group, _name string, _shardId uint32) (string, 
error) {
+       return "local", nil
+}
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index 6c86d780..6dff21fa 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -178,7 +178,7 @@ func setupForRegistry() func() {
        metaSvc, err := metadata.NewService(context.TODO())
        Expect(err).NotTo(HaveOccurred())
 
-       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
+       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index cc7e0817..ba98016e 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -91,12 +91,12 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry 
metadata.Repo) Server {
+func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry 
metadata.Repo, nodeRegistry NodeRegistry) Server {
        streamSVC := &streamService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry, nodeRegistry),
        }
        measureSVC := &measureService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry, nodeRegistry),
        }
        s := &server{
                pipeline:   pipeline,
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e4cc1ed1..06b78112 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -103,7 +103,13 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if s.log.Debug().Enabled() {
                        iwr.EntityValues = tagValues.Encode()
                }
-               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "TODO", iwr)
+               nodeId, errPickNode := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID))
+               if errPickNode != nil {
+                       s.sampled.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to pick an available node")
+                       reply(stream, s.sampled)
+                       continue
+               }
+               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeId, iwr)
                _, errWritePub := publisher.Publish(data.TopicStreamWrite, 
message)
                if errWritePub != nil {
                        s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to send a message")
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
new file mode 100644
index 00000000..a12c36e0
--- /dev/null
+++ b/pkg/node/interface.go
@@ -0,0 +1,86 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package node provides node selector for liaison
+package node
+
+import (
+       "sync"
+
+       "github.com/pkg/errors"
+       "golang.org/x/exp/slices"
+)
+
+var (
+       _ Selector = (*pickFirstSelector)(nil)
+
+       ErrNoAvailableNode = errors.New("selector: no available node")
+)
+
+type Selector interface {
+       AddNode(nodeId string)
+       RemoveNode(nodeId string)
+       Pick(group, name string, shardId uint32) (string, error)
+}
+
+func NewPickFirstSelector() Selector {
+       return &pickFirstSelector{}
+}
+
+// pickFirstSelector always pick the first node in the sorted node ids list
+type pickFirstSelector struct {
+       nodeIds   []string
+       nodeIdMap map[string]struct{}
+       mu        sync.RWMutex
+}
+
+func (p *pickFirstSelector) AddNode(nodeId string) {
+       p.mu.RLock()
+       if _, ok := p.nodeIdMap[nodeId]; !ok {
+               p.mu.RUnlock()
+               return
+       }
+       p.mu.RUnlock()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       p.nodeIdMap[nodeId] = struct{}{}
+       p.nodeIds = append(p.nodeIds, nodeId)
+       slices.Sort(p.nodeIds)
+}
+
+func (p *pickFirstSelector) RemoveNode(nodeId string) {
+       p.mu.RLock()
+       if _, ok := p.nodeIdMap[nodeId]; !ok {
+               p.mu.RUnlock()
+               return
+       }
+       p.mu.RUnlock()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       p.nodeIdMap[nodeId] = struct{}{}
+       p.nodeIds = append(p.nodeIds, nodeId)
+       slices.Sort(p.nodeIds)
+}
+
+func (p *pickFirstSelector) Pick(group, name string, shardId uint32) (string, 
error) {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+       if len(p.nodeIds) == 0 {
+               return "", ErrNoAvailableNode
+       }
+       return p.nodeIds[0], nil
+}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index e7fb94c7..281ffe8a 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -92,7 +92,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) 
func() {
        // Init `Query` module
        q, err := query.NewService(context.TODO(), streamSvc, measureSvc, 
metaSvc, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
+       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        httpServer := http.NewServer()
 
        units := []run.Unit{

Reply via email to