This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new bd8384ca Assign a separate lookup table to each group (#482)
bd8384ca is described below

commit bd8384ca5f66838f51a878668ef2a3785a806565
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Jul 1 17:16:30 2024 +0800

    Assign a separate lookup table to each group (#482)
---
 CHANGES.md              |  1 +
 pkg/cmdsetup/liaison.go |  5 +---
 pkg/node/interface.go   |  5 ----
 pkg/node/maglev.go      | 69 ++++++++++++++++++++++++++++++++++++++++---------
 pkg/node/maglev_test.go | 12 ++++-----
 5 files changed, 64 insertions(+), 28 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 82ca6e08..408a8b29 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 - Check unregistered nodes in background.
 - Improve sorting performance of stream.
 - Add the measure query trace.
+- Assign a separate lookup table to each group in the maglev selector.
 
 ### Bugs
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index a7112e94..67cd942a 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,10 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        }
        pipeline := pub.New(metaSvc)
        localPipeline := queue.Local()
-       nodeSel, err := node.NewMaglevSelector()
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate required node 
selector")
-       }
+       nodeSel := node.NewMaglevSelector()
        nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
        grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry)
        profSvc := observability.NewProfService()
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 7745c9ad..26ec5cc2 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -19,7 +19,6 @@
 package node
 
 import (
-       "strconv"
        "sync"
 
        "github.com/pkg/errors"
@@ -97,7 +96,3 @@ func (p *pickFirstSelector) Pick(_, _ string, _ uint32) 
(string, error) {
        }
        return p.nodeIDs[0], nil
 }
-
-func formatSearchKey(group, name string, shardID uint32) string {
-       return group + "/" + name + "#" + strconv.FormatUint(uint64(shardID), 
10)
-}
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index deb140f2..fea2c5b2 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -18,36 +18,81 @@
 package node
 
 import (
+       "sort"
+       "strconv"
+       "sync"
+
        "github.com/kkdai/maglev"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 )
 
+const lookupTableSize = 65537
+
 var _ Selector = (*maglevSelector)(nil)
 
 type maglevSelector struct {
-       maglev *maglev.Maglev
+       routers sync.Map
+       nodes   []string
+       mutex   sync.RWMutex
 }
 
 func (m *maglevSelector) AddNode(node *databasev1.Node) {
-       _ = m.maglev.Add(node.GetMetadata().GetName())
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+       for i := range m.nodes {
+               if m.nodes[i] == node.GetMetadata().GetName() {
+                       return
+               }
+       }
+       m.nodes = append(m.nodes, node.GetMetadata().GetName())
+       sort.StringSlice(m.nodes).Sort()
+       m.routers.Range(func(_, value any) bool {
+               _ = value.(*maglev.Maglev).Set(m.nodes)
+               return true
+       })
 }
 
 func (m *maglevSelector) RemoveNode(node *databasev1.Node) {
-       _ = m.maglev.Remove(node.GetMetadata().GetName())
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+       for i := range m.nodes {
+               if m.nodes[i] == node.GetMetadata().GetName() {
+                       m.nodes = append(m.nodes[:i], m.nodes[i+1:]...)
+                       break
+               }
+       }
+       m.routers.Range(func(_, value any) bool {
+               _ = value.(*maglev.Maglev).Set(m.nodes)
+               return true
+       })
 }
 
 func (m *maglevSelector) Pick(group, name string, shardID uint32) (string, 
error) {
-       return m.maglev.Get(formatSearchKey(group, name, shardID))
-}
+       router, ok := m.routers.Load(group)
+       if ok {
+               return router.(*maglev.Maglev).Get(formatSearchKey(name, 
shardID))
+       }
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+       router, ok = m.routers.Load(group)
+       if ok {
+               return router.(*maglev.Maglev).Get(formatSearchKey(name, 
shardID))
+       }
 
-// NewMaglevSelector creates a new backend selector based on Maglev hashing 
algorithm.
-func NewMaglevSelector() (Selector, error) {
-       alg, err := maglev.NewMaglev(nil, 65537)
+       mTab, err := maglev.NewMaglev(m.nodes, lookupTableSize)
        if err != nil {
-               return nil, err
+               return "", err
        }
-       return &maglevSelector{
-               maglev: alg,
-       }, nil
+       m.routers.Store(group, mTab)
+       return mTab.Get(formatSearchKey(name, shardID))
+}
+
+// NewMaglevSelector creates a new backend selector based on Maglev hashing 
algorithm.
+func NewMaglevSelector() Selector {
+       return &maglevSelector{}
+}
+
+func formatSearchKey(name string, shardID uint32) string {
+       return name + "-" + strconv.FormatUint(uint64(shardID), 10)
 }
diff --git a/pkg/node/maglev_test.go b/pkg/node/maglev_test.go
index d581dd03..74505a3a 100644
--- a/pkg/node/maglev_test.go
+++ b/pkg/node/maglev_test.go
@@ -34,8 +34,7 @@ const (
 )
 
 func TestMaglevSelector(t *testing.T) {
-       sel, err := NewMaglevSelector()
-       assert.NoError(t, err)
+       sel := NewMaglevSelector()
        sel.AddNode(&databasev1.Node{
                Metadata: &commonv1.Metadata{
                        Name: "data-node-1",
@@ -55,8 +54,7 @@ func TestMaglevSelector(t *testing.T) {
 }
 
 func TestMaglevSelector_EvenDistribution(t *testing.T) {
-       sel, err := NewMaglevSelector()
-       assert.NoError(t, err)
+       sel := NewMaglevSelector()
        dataNodeNum := 10
        for i := 0; i < dataNodeNum; i++ {
                sel.AddNode(&databasev1.Node{
@@ -83,8 +81,8 @@ func TestMaglevSelector_EvenDistribution(t *testing.T) {
 }
 
 func TestMaglevSelector_DiffNode(t *testing.T) {
-       fullSel, _ := NewMaglevSelector()
-       brokenSel, _ := NewMaglevSelector()
+       fullSel := NewMaglevSelector()
+       brokenSel := NewMaglevSelector()
        dataNodeNum := 10
        for i := 0; i < dataNodeNum; i++ {
                fullSel.AddNode(&databasev1.Node{
@@ -114,7 +112,7 @@ func TestMaglevSelector_DiffNode(t *testing.T) {
 }
 
 func BenchmarkMaglevSelector_Pick(b *testing.B) {
-       sel, _ := NewMaglevSelector()
+       sel := NewMaglevSelector()
        dataNodeNum := 10
        for i := 0; i < dataNodeNum; i++ {
                sel.AddNode(&databasev1.Node{

Reply via email to