This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new a6334b3 Add support of migrating with slot range (#304)
a6334b3 is described below
commit a6334b3812cd68bcac4d07dd17441325a272c4d1
Author: Byron Seto <[email protected]>
AuthorDate: Sat May 3 07:57:25 2025 -0600
Add support of migrating with slot range (#304)
---
cmd/client/command/helper.go | 4 +-
cmd/client/command/migrate.go | 16 +--
consts/errors.go | 30 +++--
controller/cluster.go | 21 ++-
controller/cluster_test.go | 6 +-
server/api/cluster.go | 6 +-
server/api/cluster_test.go | 16 ++-
server/api/shard_test.go | 11 +-
store/cluster.go | 18 +--
store/cluster_node.go | 14 +-
store/cluster_shard.go | 12 +-
store/cluster_shard_test.go | 8 +-
store/cluster_test.go | 17 +--
store/slot.go | 163 +++++++++++++----------
store/slot_test.go | 292 ++++++++++++++++++++++++++++++++++--------
15 files changed, 434 insertions(+), 200 deletions(-)
diff --git a/cmd/client/command/helper.go b/cmd/client/command/helper.go
index 1d07677..1d395ac 100644
--- a/cmd/client/command/helper.go
+++ b/cmd/client/command/helper.go
@@ -51,8 +51,8 @@ func printCluster(cluster *store.Cluster) {
role = strings.ToUpper(store.RoleMaster)
}
migratingStatus := "NO"
- if shard.MigratingSlot != -1 {
- migratingStatus = fmt.Sprintf("%d --> %d",
shard.MigratingSlot, shard.TargetShardIndex)
+ if shard.MigratingSlot != nil {
+ migratingStatus = fmt.Sprintf("%s --> %d",
shard.MigratingSlot, shard.TargetShardIndex)
}
columns := []string{fmt.Sprintf("%d", i), node.ID(),
node.Addr(), role, migratingStatus}
writer.Append(columns)
diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go
index c8ccbac..009b416 100644
--- a/cmd/client/command/migrate.go
+++ b/cmd/client/command/migrate.go
@@ -26,13 +26,14 @@ import (
"strconv"
"strings"
+ "github.com/apache/kvrocks-controller/store"
"github.com/spf13/cobra"
)
type MigrationOptions struct {
namespace string
cluster string
- slot int
+ slot string
target int
slotOnly bool
}
@@ -69,14 +70,11 @@ func migrationPreRun(_ *cobra.Command, args []string) error
{
if len(args) < 2 {
return fmt.Errorf("the slot number should be specified")
}
- slot, err := strconv.Atoi(args[1])
+ _, err := store.ParseSlotRange(args[1])
if err != nil {
- return fmt.Errorf("invalid slot number: %s", args[1])
+ return fmt.Errorf("invalid slot number: %s, error: %w",
args[1], err)
}
- if slot < 0 || slot > 16383 {
- return errors.New("slot number should be in range [0, 16383]")
- }
- migrateOptions.slot = slot
+ migrateOptions.slot = args[1]
if migrateOptions.namespace == "" {
return fmt.Errorf("namespace is required, please specify with
-n or --namespace")
@@ -106,12 +104,12 @@ func migrateSlot(client *client, options
*MigrationOptions) error {
if rsp.IsError() {
return errors.New(rsp.String())
}
- printLine("migrate slot[%d] task is submitted successfully.",
options.slot)
+ printLine("migrate slot[%s] task is submitted successfully.",
options.slot)
return nil
}
func init() {
- MigrateCommand.Flags().IntVar(&migrateOptions.slot, "slot", -1, "The
slot to migrate")
+ MigrateCommand.Flags().StringVar(&migrateOptions.slot, "slot", "", "The
slot to migrate")
MigrateCommand.Flags().IntVar(&migrateOptions.target, "target", -1,
"The target node")
MigrateCommand.Flags().StringVarP(&migrateOptions.namespace,
"namespace", "n", "", "The namespace")
MigrateCommand.Flags().StringVarP(&migrateOptions.cluster, "cluster",
"c", "", "The cluster")
diff --git a/consts/errors.go b/consts/errors.go
index 8ad1034..bf7e51d 100644
--- a/consts/errors.go
+++ b/consts/errors.go
@@ -23,18 +23,20 @@ package consts
import "errors"
var (
- ErrInvalidArgument = errors.New("invalid argument")
- ErrNotFound = errors.New("not found")
- ErrForbidden = errors.New("forbidden")
- ErrAlreadyExists = errors.New("already exists")
- ErrIndexOutOfRange = errors.New("index out of range")
- ErrShardIsSame = errors.New("source and target shard is
same")
- ErrSlotOutOfRange = errors.New("slot out of range")
- ErrSlotNotBelongToAnyShard = errors.New("slot not belong to any shard")
- ErrNodeIsNotMaster = errors.New("the old node is not master")
- ErrOldMasterNodeNotFound = errors.New("old master node not found")
- ErrShardNoReplica = errors.New("no replica in shard")
- ErrShardIsServicing = errors.New("shard is servicing")
- ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
- ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
+ ErrInvalidArgument = errors.New("invalid argument")
+ ErrNotFound = errors.New("not found")
+ ErrForbidden = errors.New("forbidden")
+ ErrAlreadyExists = errors.New("already exists")
+ ErrIndexOutOfRange = errors.New("index out of range")
+ ErrShardIsSame = errors.New("source and target
shard is same")
+ ErrSlotOutOfRange = errors.New("slot out of range")
+ ErrSlotNotBelongToAnyShard = errors.New("slot not belong to
any shard")
+ ErrSlotRangeBelongsToMultipleShards = errors.New("slot range belongs to
multiple shards")
+ ErrNodeIsNotMaster = errors.New("the old node is not
master")
+ ErrOldMasterNodeNotFound = errors.New("old master node not
found")
+ ErrShardNoReplica = errors.New("no replica in shard")
+ ErrShardIsServicing = errors.New("shard is servicing")
+ ErrShardSlotIsMigrating = errors.New("shard slot is
migrating")
+ ErrShardNoMatchNewMaster = errors.New("no match new master
in shard")
+ ErrSlotStartAndStopEqual = errors.New("start and stop of a
range cannot be equal")
)
diff --git a/controller/cluster.go b/controller/cluster.go
index cf60b9c..2d6802e 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -324,10 +324,16 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
log.Error("Failed to get the cluster info from the
source node", zap.Error(err))
return
}
- if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
+ if sourceNodeClusterInfo.MigratingSlot == nil {
+ log.Error("The source migration slot is empty",
+ zap.String("migrating_slot",
shard.MigratingSlot.String()),
+ )
+ return
+ }
+ if
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
log.Error("Mismatch migrating slot",
- zap.Int("source_migrating_slot",
sourceNodeClusterInfo.MigratingSlot),
- zap.Int("migrating_slot", shard.MigratingSlot),
+ zap.String("source_migrating_slot",
sourceNodeClusterInfo.MigratingSlot.String()),
+ zap.String("migrating_slot",
shard.MigratingSlot.String()),
)
return
}
@@ -347,17 +353,18 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, clonedClu
return
}
c.updateCluster(clonedCluster)
- log.Warn("Failed to migrate the slot", zap.Int("slot",
migratingSlot))
+ log.Warn("Failed to migrate the slot",
zap.String("slot", migratingSlot.String()))
case "success":
- clonedCluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges,
shard.MigratingSlot)
+ clonedCluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges,
*shard.MigratingSlot)
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges
= store.AddSlotToSlotRanges(
-
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
+
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
+ )
migratedSlot := shard.MigratingSlot
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.UpdateCluster(ctx,
c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
} else {
- log.Info("Migrate the slot successfully",
zap.Int("slot", migratedSlot))
+ log.Info("Migrate the slot successfully",
zap.String("slot", migratedSlot.String()))
}
c.updateCluster(clonedCluster)
default:
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index 2fd7db7..d415f3f 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode0, mockNode1, mockNode2, mockNode3,
},
SlotRanges: []store.SlotRange{{Start: 0, Stop:
16383}},
- MigratingSlot: -1,
+ MigratingSlot: nil,
TargetShardIndex: -1,
}},
}
@@ -219,7 +219,9 @@ func TestCluster_MigrateSlot(t *testing.T) {
defer func() {
require.NoError(t, cluster.Reset(ctx))
}()
- require.NoError(t, cluster.MigrateSlot(ctx, 0, 1, false))
+ slotRange, err := store.NewSlotRange(0, 0)
+ require.NoError(t, err)
+ require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false))
s := NewMockClusterStore()
require.NoError(t, s.CreateCluster(ctx, ns, cluster))
diff --git a/server/api/cluster.go b/server/api/cluster.go
index 3d1e896..b539c90 100644
--- a/server/api/cluster.go
+++ b/server/api/cluster.go
@@ -32,9 +32,9 @@ import (
)
type MigrateSlotRequest struct {
- Target int `json:"target" validate:"required"`
- Slot int `json:"slot" validate:"required"`
- SlotOnly bool `json:"slot_only"`
+ Target int `json:"target" validate:"required"`
+ Slot store.SlotRange `json:"slot" validate:"required"`
+ SlotOnly bool `json:"slot_only"`
}
type CreateClusterRequest struct {
diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go
index 53995ba..b1f1761 100644
--- a/server/api/cluster_test.go
+++ b/server/api/cluster_test.go
@@ -126,8 +126,10 @@ func TestClusterBasics(t *testing.T) {
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: clusterName}}
+ slotRange, err := store.NewSlotRange(3, 3)
+ require.NoError(t, err)
testMigrateReq := &MigrateSlotRequest{
- Slot: 3,
+ Slot: *slotRange,
SlotOnly: true,
Target: 1,
}
@@ -163,7 +165,6 @@ func TestClusterBasics(t *testing.T) {
runRemove(t, "test-cluster", http.StatusNoContent)
runRemove(t, "not-exist", http.StatusNotFound)
})
-
}
func TestClusterImport(t *testing.T) {
@@ -233,8 +234,10 @@ func TestClusterMigrateData(t *testing.T) {
reqCtx := GetTestContext(recorder)
reqCtx.Set(consts.ContextKeyStore, handler.s)
reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key:
"cluster", Value: clusterName}}
+ slotRange, err := store.NewSlotRange(0, 0)
+ require.NoError(t, err)
testMigrateReq := &MigrateSlotRequest{
- Slot: 0,
+ Slot: *slotRange,
Target: 1,
}
body, err := json.Marshal(testMigrateReq)
@@ -248,14 +251,15 @@ func TestClusterMigrateData(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 1, gotCluster.Version.Load())
require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
- require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
+ require.EqualValues(t, &store.SlotRange{Start: 0, Stop: 0},
gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
ctrl, err := controller.New(handler.s.(*store.ClusterStore),
&config.ControllerConfig{
FailOver: &config.FailOverConfig{
PingIntervalSeconds: 1,
MaxPingCount: 3,
- }})
+ },
+ })
require.NoError(t, err)
require.NoError(t, ctrl.Start(ctx))
ctrl.WaitForReady()
@@ -268,6 +272,6 @@ func TestClusterMigrateData(t *testing.T) {
if err != nil {
return false
}
- return gotCluster.Shards[0].MigratingSlot == -1
+ return gotCluster.Shards[0].MigratingSlot == nil
}, 10*time.Second, 100*time.Millisecond)
}
diff --git a/server/api/shard_test.go b/server/api/shard_test.go
index c4cd642..cdaa89b 100644
--- a/server/api/shard_test.go
+++ b/server/api/shard_test.go
@@ -84,7 +84,8 @@ func TestShardBasics(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
- {Key: "shard", Value: strconv.Itoa(shardIndex)}}
+ {Key: "shard", Value: strconv.Itoa(shardIndex)},
+ }
middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
@@ -103,7 +104,8 @@ func TestShardBasics(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
- {Key: "shard", Value: "1"}}
+ {Key: "shard", Value: "1"},
+ }
middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
@@ -124,7 +126,7 @@ func TestShardBasics(t *testing.T) {
nodeAddrs = append(nodeAddrs, node.Addr())
}
require.ElementsMatch(t, []string{"127.0.0.1:1235",
"127.0.0.1:1236"}, nodeAddrs)
- require.EqualValues(t, -1, rsp.Data.Shard.MigratingSlot)
+ require.Nil(t, rsp.Data.Shard.MigratingSlot)
require.EqualValues(t, -1, rsp.Data.Shard.TargetShardIndex)
})
@@ -172,7 +174,8 @@ func TestClusterFailover(t *testing.T) {
ctx.Params = []gin.Param{
{Key: "namespace", Value: ns},
{Key: "cluster", Value: clusterName},
- {Key: "shard", Value: strconv.Itoa(shardIndex)}}
+ {Key: "shard", Value: strconv.Itoa(shardIndex)},
+ }
middleware.RequiredClusterShard(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
diff --git a/store/cluster.go b/store/cluster.go
index 3bde499..b4bfd9e 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -132,7 +132,8 @@ func (cluster *Cluster) RemoveNode(shardIndex int, nodeID
string) error {
}
func (cluster *Cluster) PromoteNewMaster(ctx context.Context,
- shardIdx int, masterNodeID, preferredNodeID string) (string, error) {
+ shardIdx int, masterNodeID, preferredNodeID string,
+) (string, error) {
shard, err := cluster.GetShard(shardIdx)
if err != nil {
return "", err
@@ -175,17 +176,16 @@ func (cluster *Cluster) Reset(ctx context.Context) error {
return nil
}
-func (cluster *Cluster) findShardIndexBySlot(slot int) (int, error) {
- if slot < 0 || slot > MaxSlotID {
- return -1, consts.ErrSlotOutOfRange
- }
+func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) {
sourceShardIdx := -1
for i := 0; i < len(cluster.Shards); i++ {
slotRanges := cluster.Shards[i].SlotRanges
for _, slotRange := range slotRanges {
- if slotRange.Contains(slot) {
+ if slotRange.HasOverlap(&slot) {
+ if sourceShardIdx != -1 {
+ return sourceShardIdx,
consts.ErrSlotRangeBelongsToMultipleShards
+ }
sourceShardIdx = i
- break
}
}
}
@@ -195,7 +195,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot int)
(int, error) {
return sourceShardIdx, nil
}
-func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int,
targetShardIdx int, slotOnly bool) error {
+func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange,
targetShardIdx int, slotOnly bool) error {
if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) {
return consts.ErrIndexOutOfRange
}
@@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context,
slot int, targetShardId
}
// Will start the data migration in the background
- cluster.Shards[sourceShardIdx].MigratingSlot = slot
+ cluster.Shards[sourceShardIdx].MigratingSlot = &slot
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
return nil
}
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 3434595..e16ae83 100644
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -68,7 +68,7 @@ type Node interface {
GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
SyncClusterInfo(ctx context.Context, cluster *Cluster) error
CheckClusterMode(ctx context.Context) (int64, error)
- MigrateSlot(ctx context.Context, slot int, NodeID string) error
+ MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error
MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error
@@ -85,9 +85,9 @@ type ClusterNode struct {
}
type ClusterInfo struct {
- CurrentEpoch int64 `json:"cluster_current_epoch"`
- MigratingSlot int `json:"migrating_slot"`
- MigratingState string `json:"migrating_state"`
+ CurrentEpoch int64 `json:"cluster_current_epoch"`
+ MigratingSlot *SlotRange `json:"migrating_slot"`
+ MigratingState string `json:"migrating_state"`
}
type ClusterNodeInfo struct {
@@ -195,7 +195,7 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context)
(*ClusterInfo, error)
}
case "migrating_slot", "migrating_slot(s)":
// TODO(@git-hulk): handle multiple migrating slots
- clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1])
+ clusterInfo.MigratingSlot, err =
ParseSlotRange(fields[1])
if err != nil {
return nil, err
}
@@ -257,8 +257,8 @@ func (n *ClusterNode) Reset(ctx context.Context) error {
return n.GetClient().ClusterResetHard(ctx).Err()
}
-func (n *ClusterNode) MigrateSlot(ctx context.Context, slot int, targetNodeID
string) error {
- return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot,
targetNodeID).Err()
+func (n *ClusterNode) MigrateSlot(ctx context.Context, slot SlotRange,
targetNodeID string) error {
+ return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot.String(),
targetNodeID).Err()
}
func (n *ClusterNode) MarshalJSON() ([]byte, error) {
diff --git a/store/cluster_shard.go b/store/cluster_shard.go
index 1de4dc5..62c8826 100644
--- a/store/cluster_shard.go
+++ b/store/cluster_shard.go
@@ -37,7 +37,7 @@ type Shard struct {
Nodes []Node `json:"nodes"`
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
- MigratingSlot int `json:"migrating_slot"`
+ MigratingSlot *SlotRange `json:"migrating_slot"`
}
type Shards []*Shard
@@ -45,9 +45,11 @@ type Shards []*Shard
func (s Shards) Len() int {
return len(s)
}
+
func (s Shards) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
+
func (s Shards) Less(i, j int) bool {
if len(s[i].SlotRanges) == 0 {
return false
@@ -61,7 +63,7 @@ func NewShard() *Shard {
return &Shard{
Nodes: make([]Node, 0),
SlotRanges: make([]SlotRange, 0),
- MigratingSlot: -1,
+ MigratingSlot: nil,
TargetShardIndex: -1,
}
}
@@ -78,7 +80,7 @@ func (shard *Shard) Clone() *Shard {
}
func (shard *Shard) ClearMigrateState() {
- shard.MigratingSlot = -1
+ shard.MigratingSlot = nil
shard.TargetShardIndex = -1
}
@@ -110,7 +112,7 @@ func (shard *Shard) addNode(addr, role, password string)
error {
}
func (shard *Shard) IsMigrating() bool {
- return shard.MigratingSlot != -1 && shard.TargetShardIndex != -1
+ return shard.MigratingSlot != nil && shard.TargetShardIndex != -1
}
func (shard *Shard) GetMasterNode() Node {
@@ -259,7 +261,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error {
var data struct {
SlotRanges []SlotRange `json:"slot_ranges"`
TargetShardIndex int `json:"target_shard_index"`
- MigratingSlot int `json:"migrating_slot"`
+ MigratingSlot *SlotRange `json:"migrating_slot"`
Nodes []*ClusterNode `json:"nodes"`
}
if err := json.Unmarshal(bytes, &data); err != nil {
diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go
index 43ea157..994046f 100644
--- a/store/cluster_shard_test.go
+++ b/store/cluster_shard_test.go
@@ -54,17 +54,19 @@ func TestShard_Sort(t *testing.T) {
}
func TestShard_IsServicing(t *testing.T) {
+ var err error
shard := NewShard()
shard.TargetShardIndex = 0
- shard.MigratingSlot = -1
+ shard.MigratingSlot = nil
require.False(t, shard.IsServicing())
shard.TargetShardIndex = 0
- shard.MigratingSlot = 0
+ shard.MigratingSlot, err = NewSlotRange(1, 1)
+ require.Nil(t, err)
require.True(t, shard.IsServicing())
shard.TargetShardIndex = -1
- shard.MigratingSlot = -1
+ shard.MigratingSlot = nil
shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
require.True(t, shard.IsServicing())
diff --git a/store/cluster_test.go b/store/cluster_test.go
index cb62545..6b6e45c 100644
--- a/store/cluster_test.go
+++ b/store/cluster_test.go
@@ -42,22 +42,23 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) {
cluster, err := NewCluster("test", []string{"node1", "node2", "node3"},
1)
require.NoError(t, err)
- shard, err := cluster.findShardIndexBySlot(0)
+ slotRange, err := NewSlotRange(0, 0)
+ require.NoError(t, err)
+ shard, err := cluster.findShardIndexBySlot(*slotRange)
require.NoError(t, err)
require.Equal(t, 0, shard)
- shard, err = cluster.findShardIndexBySlot(MaxSlotID/3 + 1)
+ slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1)
+ require.NoError(t, err)
+ shard, err = cluster.findShardIndexBySlot(*slotRange)
require.NoError(t, err)
require.Equal(t, 1, shard)
- shard, err = cluster.findShardIndexBySlot(MaxSlotID)
+ slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID)
+ require.NoError(t, err)
+ shard, err = cluster.findShardIndexBySlot(*slotRange)
require.NoError(t, err)
require.Equal(t, 2, shard)
-
- _, err = cluster.findShardIndexBySlot(-1)
- require.ErrorIs(t, err, consts.ErrSlotOutOfRange)
- _, err = cluster.findShardIndexBySlot(MaxSlotID + 1)
- require.ErrorIs(t, err, consts.ErrSlotOutOfRange)
}
func TestCluster_PromoteNewMaster(t *testing.T) {
diff --git a/store/slot.go b/store/slot.go
index 8544aa6..04a2b52 100644
--- a/store/slot.go
+++ b/store/slot.go
@@ -22,9 +22,12 @@ package store
import (
"encoding/json"
"errors"
+ "fmt"
"sort"
"strconv"
"strings"
+
+ "github.com/apache/kvrocks-controller/consts"
)
const (
@@ -35,15 +38,15 @@ const (
var ErrSlotOutOfRange = errors.New("slot id was out of range, should be
between 0 and 16383")
type SlotRange struct {
- Start int `json:"start"`
- Stop int `json:"stop"`
+ Start int `json:"start"` // inclusive
+ Stop int `json:"stop"` // inclusive
}
type SlotRanges []SlotRange
func NewSlotRange(start, stop int) (*SlotRange, error) {
if start > stop {
- return nil, errors.New("start was larger than Shutdown")
+ return nil, errors.New("start was larger than stop")
}
if (start < MinSlotID || start > MaxSlotID) ||
(stop < MinSlotID || stop > MaxSlotID) {
@@ -55,8 +58,21 @@ func NewSlotRange(start, stop int) (*SlotRange, error) {
}, nil
}
+func (slotRange *SlotRange) Equal(that *SlotRange) bool {
+ if that == nil {
+ return false
+ }
+ if slotRange.Start != that.Start {
+ return false
+ }
+ if slotRange.Stop != that.Stop {
+ return false
+ }
+ return true
+}
+
func (slotRange *SlotRange) HasOverlap(that *SlotRange) bool {
- return !(slotRange.Stop < that.Start || slotRange.Start > that.Stop)
+ return slotRange.Stop >= that.Start && slotRange.Start <= that.Stop
}
func (slotRange *SlotRange) Contains(slot int) bool {
@@ -88,6 +104,11 @@ func (slotRange *SlotRange) UnmarshalJSON(data []byte)
error {
}
func ParseSlotRange(s string) (*SlotRange, error) {
+ numberOfRanges := strings.Count(s, "-")
+ if numberOfRanges > 1 {
+ return nil, fmt.Errorf("%w, cannot have more than one range",
consts.ErrInvalidArgument)
+ }
+
index := strings.IndexByte(s, '-')
if index == -1 {
start, err := strconv.Atoi(s)
@@ -112,7 +133,7 @@ func ParseSlotRange(s string) (*SlotRange, error) {
return nil, err
}
if start > stop {
- return nil, errors.New("start slot id greater than Shutdown
slot id")
+ return nil, errors.New("start slot id greater than stop slot
id")
}
if (start < MinSlotID || start > MaxSlotID) ||
(stop < MinSlotID || stop > MaxSlotID) {
@@ -133,87 +154,89 @@ func (SlotRanges *SlotRanges) Contains(slot int) bool {
return false
}
-func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges {
- sort.Slice(source, func(i, j int) bool {
- return source[i].Start < source[j].Start
- })
- if len(source) == 0 {
- return append(source, SlotRange{Start: slot, Stop: slot})
+func (SlotRanges *SlotRanges) HasOverlap(slotRange SlotRange) bool {
+ for _, slotRange := range *SlotRanges {
+ if slotRange.HasOverlap(&slotRange) {
+ return true
+ }
}
- if source[0].Start-1 > slot {
- return append([]SlotRange{{Start: slot, Stop: slot}}, source...)
+ return false
+}
+
+// CanMerge will return true if the given SlotRanges are adjacent with each
other
+func CanMerge(a, b SlotRange) bool {
+ // Ensure a starts before b for easier comparison
+ if a.Start > b.Start {
+ a, b = b, a
}
- if source[len(source)-1].Stop+1 < slot {
- return append(source, SlotRange{Start: slot, Stop: slot})
+ // If the end of `a` is at least one less than the start of `b`, they
can merge
+ return a.Stop+1 >= b.Start
+}
+
+func MergeSlotRanges(a SlotRange, b SlotRange) SlotRange {
+ return SlotRange{
+ Start: min(a.Start, b.Start),
+ Stop: max(a.Stop, b.Stop),
}
+}
- // first run is to find the fittest slot range and create a new one if
necessary
- for i, slotRange := range source {
- if slotRange.Contains(slot) {
- return source
- }
- // check next slot range, it won't be the last one since we
have checked it before
- if slotRange.Stop+1 < slot {
- continue
- }
- if slotRange.Start == slot+1 {
- source[i].Start = slot
- } else if slotRange.Stop == slot-1 {
- source[i].Stop = slot
- } else if slotRange.Start > slot {
- // no suitable slot range, create a new one before the
current slot range
- tmp := make(SlotRanges, len(source)+1)
- copy(tmp, source[0:i])
- tmp[i] = SlotRange{Start: slot, Stop: slot}
- copy(tmp[i+1:], source[i:])
- source = tmp
+// Implemented following leetcode solution:
+//
https://leetcode.com/problems/merge-intervals/solutions/1805268/go-clean-code-with-explanation-and-visual-10ms-100
+func AddSlotToSlotRanges(source SlotRanges, slot SlotRange) SlotRanges {
+ if len(source) == 0 {
+ return append(source, slot)
+ }
+ source = append(source, slot)
+ sort.Slice(source, func(i, j int) bool {
+ return source[i].Start < source[j].Start
+ })
+
+ mergedSlotRanges := make([]SlotRange, 0, len(source))
+ mergedSlotRanges = append(mergedSlotRanges, source[0])
+
+ for _, interval := range source[1:] {
+ lastIntervalPos := len(mergedSlotRanges) - 1
+ lastInterval := mergedSlotRanges[lastIntervalPos]
+ if CanMerge(lastInterval, interval) {
+ mergedSlotRanges[lastIntervalPos] =
MergeSlotRanges(interval, lastInterval)
} else {
- // should not reach here
- panic("should not reach here")
- }
- break
- }
- // merge the slot ranges if necessary
- for i := 0; i < len(source)-1; i++ {
- if source[i].Stop+1 == source[i+1].Start {
- source[i].Stop = source[i+1].Stop
- if i+1 == len(source)-1 {
- // remove the last slot range
- source = source[:i+1]
- } else {
- source = append(source[:i+1], source[i+2:]...)
- }
+ mergedSlotRanges = append(mergedSlotRanges, interval)
}
}
- return source
+
+ return mergedSlotRanges
}
-func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges {
+func RemoveSlotFromSlotRanges(source SlotRanges, slot SlotRange) SlotRanges {
sort.Slice(source, func(i, j int) bool {
return source[i].Start < source[j].Start
})
- if !source.Contains(slot) {
+ if !source.HasOverlap(slot) {
return source
}
- for i, slotRange := range source {
- if slotRange.Contains(slot) {
- if slotRange.Start == slot && slotRange.Stop == slot {
- source = append(source[0:i], source[i+1:]...)
- } else if slotRange.Start == slot {
- source[i].Start = slot + 1
- } else if slotRange.Stop == slot {
- source[i].Stop = slot - 1
- } else {
- tmp := make(SlotRanges, len(source)+1)
- copy(tmp, source[0:i])
- tmp[i] = SlotRange{Start: slotRange.Start,
Stop: slot - 1}
- tmp[i+1] = SlotRange{Start: slot + 1, Stop:
slotRange.Stop}
- copy(tmp[i+2:], source[i+1:])
- source = tmp
- }
+
+ result := make([]SlotRange, 0, len(source))
+ for _, slotRange := range source {
+ // if no overlap, keep original range
+ if !slotRange.HasOverlap(&slot) {
+ result = append(result, slotRange)
+ continue
+ }
+ // if overlap, then we need to create a new left and right range
+ if slotRange.Start < slot.Start {
+ result = append(result, SlotRange{
+ Start: slotRange.Start,
+ Stop: slot.Start - 1,
+ })
+ }
+ if slotRange.Stop > slot.Stop {
+ result = append(result, SlotRange{
+ Start: slot.Stop + 1,
+ Stop: slotRange.Stop,
+ })
}
}
- return source
+ return result
}
func CalculateSlotRanges(n int) SlotRanges {
diff --git a/store/slot_test.go b/store/slot_test.go
index f5740af..9158198 100644
--- a/store/slot_test.go
+++ b/store/slot_test.go
@@ -22,6 +22,7 @@ package store
import (
"testing"
+ "github.com/apache/kvrocks-controller/consts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -47,6 +48,16 @@ func TestSlotRange_Parse(t *testing.T) {
assert.Equal(t, 1, sr.Start)
assert.Equal(t, 12, sr.Stop)
+ sr, err = ParseSlotRange("5")
+ assert.Nil(t, err)
+ assert.Equal(t, 5, sr.Start)
+ assert.Equal(t, 5, sr.Stop)
+
+ sr, err = ParseSlotRange("0")
+ assert.Nil(t, err)
+ assert.Equal(t, 0, sr.Start)
+ assert.Equal(t, 0, sr.Stop)
+
_, err = ParseSlotRange("1-65536")
assert.Equal(t, ErrSlotOutOfRange, err)
@@ -55,6 +66,12 @@ func TestSlotRange_Parse(t *testing.T) {
_, err = ParseSlotRange("12-1")
assert.NotNil(t, err)
+
+ _, err = ParseSlotRange("1-12 5-10")
+ require.ErrorIs(t, err, consts.ErrInvalidArgument)
+
+ _, err = ParseSlotRange("1-12, 5")
+ assert.NotNil(t, err)
}
func TestAddSlotToSlotRanges(t *testing.T) {
@@ -63,25 +80,35 @@ func TestAddSlotToSlotRanges(t *testing.T) {
{Start: 101, Stop: 199},
{Start: 201, Stop: 300},
}
- slotRanges = AddSlotToSlotRanges(slotRanges, 0)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0])
+ slotRange, err := NewSlotRange(0, 0)
+ require.NoError(t, err)
+ slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0],
slotRanges)
- slotRanges = AddSlotToSlotRanges(slotRanges, 21)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0])
+ slotRange, err = NewSlotRange(21, 21)
+ require.NoError(t, err)
+ slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0],
slotRanges)
- slotRanges = AddSlotToSlotRanges(slotRanges, 50)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1])
+ slotRange, err = NewSlotRange(50, 50)
+ require.NoError(t, err)
+ slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1],
slotRanges)
- slotRanges = AddSlotToSlotRanges(slotRanges, 200)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2])
+ slotRange, err = NewSlotRange(200, 200)
+ require.NoError(t, err)
+ slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2],
slotRanges)
- slotRanges = AddSlotToSlotRanges(slotRanges, 400)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3])
+ slotRange, err = NewSlotRange(400, 400)
+ require.NoError(t, err)
+ slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3],
slotRanges)
}
func TestRemoveSlotRanges(t *testing.T) {
@@ -90,42 +117,60 @@ func TestRemoveSlotRanges(t *testing.T) {
{Start: 101, Stop: 199},
{Start: 201, Stop: 300},
}
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20)
- require.Equal(t, 3, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298)
- require.Equal(t, 5, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
- require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4])
-
- slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299)
- require.Equal(t, 4, len(slotRanges))
- require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
+ slotRange, err := NewSlotRange(0, 0)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0],
slotRanges)
+
+ slotRange, err = NewSlotRange(21, 21)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0],
slotRanges)
+
+ slotRange, err = NewSlotRange(20, 20)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 3, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0],
slotRanges)
+
+ slotRange, err = NewSlotRange(150, 150)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1],
slotRanges)
+
+ slotRange, err = NewSlotRange(101, 101)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1],
slotRanges)
+
+ slotRange, err = NewSlotRange(199, 199)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2],
slotRanges)
+
+ slotRange, err = NewSlotRange(300, 300)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3],
slotRanges)
+
+ slotRange, err = NewSlotRange(298, 298)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 5, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3],
slotRanges)
+ require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4],
slotRanges)
+
+ slotRange, err = NewSlotRange(299, 299)
+ require.NoError(t, err)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange)
+ require.Equal(t, 4, len(slotRanges), slotRanges)
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3],
slotRanges)
}
func TestCalculateSlotRanges(t *testing.T) {
@@ -135,3 +180,148 @@ func TestCalculateSlotRanges(t *testing.T) {
assert.Equal(t, 13104, slots[4].Start)
assert.Equal(t, 16383, slots[4].Stop)
}
+
+func TestSlotRange_HasOverlap(t *testing.T) {
+ type fields struct {
+ Start int
+ Stop int
+ }
+ type args struct {
+ that *SlotRange
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ want bool
+ }{
+ {
+ name: "0-5 does not overlap 6-7",
+ fields: fields{Start: 0, Stop: 5},
+ args: args{&SlotRange{Start: 6, Stop: 7}},
+ want: false,
+ },
+ {
+ name: "0-5 does overlap 3-4",
+ fields: fields{Start: 0, Stop: 5},
+ args: args{&SlotRange{Start: 3, Stop: 4}},
+ want: true,
+ },
+ {
+ name: "0-5 does overlap 5-8",
+ fields: fields{Start: 0, Stop: 5},
+ args: args{&SlotRange{Start: 5, Stop: 8}},
+ want: true,
+ },
+ {
+ name: "0-5 does overlap 4-8",
+ fields: fields{Start: 0, Stop: 5},
+ args: args{&SlotRange{Start: 4, Stop: 8}},
+ want: true,
+ },
+ {
+ name: "0-100 does not overlap 101-150",
+ fields: fields{Start: 0, Stop: 100},
+ args: args{&SlotRange{Start: 101, Stop: 150}},
+ want: false,
+ },
+ {
+ name: "50-100 does overlap 30-50",
+ fields: fields{Start: 50, Stop: 100},
+ args: args{&SlotRange{Start: 30, Stop: 50}},
+ want: true,
+ },
+ {
+ name: "50-100 does overlap 50-51",
+ fields: fields{Start: 50, Stop: 100},
+ args: args{&SlotRange{Start: 50, Stop: 51}},
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ slotRange := &SlotRange{
+ Start: tt.fields.Start,
+ Stop: tt.fields.Stop,
+ }
+ if got := slotRange.HasOverlap(tt.args.that); got !=
tt.want {
+ t.Errorf("SlotRange.HasOverlap() = %v, want
%v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestCanMerge(t *testing.T) {
+ type args struct {
+ a SlotRange
+ b SlotRange
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "0-5 and 6-10 can merge",
+ args: args{SlotRange{0, 5}, SlotRange{6, 10}},
+ want: true,
+ },
+ {
+ name: "6-10 and 0-5 can merge",
+ args: args{SlotRange{6, 10}, SlotRange{0, 5}},
+ want: true,
+ },
+ {
+ name: "6-6 and 0-5 can merge",
+ args: args{SlotRange{6, 6}, SlotRange{0, 5}},
+ want: true,
+ },
+ {
+ name: "0-5 and 7-10 cannot merge",
+ args: args{SlotRange{0, 5}, SlotRange{7, 10}},
+ want: false,
+ },
+ {
+ name: "7-10 and 0-5 cannot merge",
+ args: args{SlotRange{7, 10}, SlotRange{0, 5}},
+ want: false,
+ },
+ {
+ name: "2-2 and 4-4 cannot merge",
+ args: args{SlotRange{2, 2}, SlotRange{4, 4}},
+ want: false,
+ },
+ {
+ name: "4-4 and 2-2 cannot merge",
+ args: args{SlotRange{4, 4}, SlotRange{2, 2}},
+ want: false,
+ },
+ {
+ name: "2-3 and 4-4 can merge",
+ args: args{SlotRange{2, 3}, SlotRange{4, 4}},
+ want: true,
+ },
+ {
+ name: "4-4 and 2-3 can merge",
+ args: args{SlotRange{4, 4}, SlotRange{2, 3}},
+ want: true,
+ },
+ {
+ name: "4-4 and 3-3 can merge",
+ args: args{SlotRange{4, 4}, SlotRange{3, 3}},
+ want: true,
+ },
+ {
+ name: "3-3 and 4-4 can merge",
+ args: args{SlotRange{3, 3}, SlotRange{4, 4}},
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := CanMerge(tt.args.a, tt.args.b); got !=
tt.want {
+ t.Errorf("CanMerge() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}