This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0169651d Add benchmark and fix some issue of background property
repair (#725)
0169651d is described below
commit 0169651d420dfb5f7098ecdaf57009fb5d813a87
Author: mrproliu <[email protected]>
AuthorDate: Tue Aug 26 22:15:37 2025 +0700
Add benchmark and fix some issue of background property repair (#725)
---
.github/workflows/slow-test.yml | 7 +
api/proto/banyandb/property/v1/repair.proto | 20 +-
banyand/liaison/grpc/property.go | 9 +-
banyand/property/gossip/client.go | 15 +-
banyand/property/gossip/server.go | 9 +-
banyand/property/gossip/service.go | 2 +
banyand/property/gossip/trace.go | 12 +-
banyand/property/repair.go | 224 +++++++----
banyand/property/repair_gossip.go | 185 +++++----
bydbctl/internal/cmd/property_test.go | 244 ++++++++++++
docs/api-reference.md | 58 ++-
test/docker/base-compose.yml | 1 +
test/property_repair/README.md | 142 +++++++
test/property_repair/base-compose.yml | 111 ++++++
test/property_repair/full_data/cpu-usage.png | Bin 0 -> 60836 bytes
.../full_data/docker-compose-3nodes.yml | 65 ++++
test/property_repair/full_data/integrated_test.go | 192 +++++++++
test/property_repair/half_data/cpu-usage.png | Bin 0 -> 54254 bytes
.../half_data/docker-compose-3nodes.yml | 76 ++++
test/property_repair/half_data/integrated_test.go | 219 +++++++++++
test/property_repair/prometheus-3nodes.yml | 43 +++
.../same_data/docker-compose-3nodes.yml | 65 ++++
test/property_repair/same_data/integrated_test.go | 178 +++++++++
test/property_repair/shared_utils.go | 428 +++++++++++++++++++++
24 files changed, 2144 insertions(+), 161 deletions(-)
diff --git a/.github/workflows/slow-test.yml b/.github/workflows/slow-test.yml
index c62496d3..8974c19d 100644
--- a/.github/workflows/slow-test.yml
+++ b/.github/workflows/slow-test.yml
@@ -31,3 +31,10 @@ jobs:
with:
options: --label-filter slow
timeout-minutes: 120
+
+ property-repair:
+ if: github.repository == 'apache/skywalking-banyandb'
+ uses: ./.github/workflows/test.yml
+ with:
+ options: --label-filter property_repair
+ timeout-minutes: 120
diff --git a/api/proto/banyandb/property/v1/repair.proto
b/api/proto/banyandb/property/v1/repair.proto
index 24510ed9..7c158598 100644
--- a/api/proto/banyandb/property/v1/repair.proto
+++ b/api/proto/banyandb/property/v1/repair.proto
@@ -69,7 +69,18 @@ message PropertySync {
int64 delete_time = 3;
}
-message NoMorePropertySync {}
+enum PropertySyncFromType {
+ PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED = 0;
+ PROPERTY_SYNC_FROM_TYPE_MISSING = 1; // client missing but server existing
+ PROPERTY_SYNC_FROM_TYPE_SYNC = 2; // client existing but server missing or
SHA value mismatches
+}
+
+message PropertySyncWithFrom {
+ PropertySyncFromType from = 1;
+ PropertySync property = 2;
+}
+
+message WaitNextDifferData {}
message RepairRequest {
oneof data {
@@ -82,9 +93,8 @@ message RepairRequest {
// case 2: client existing but server missing
// case 3: SHA value mismatches
PropertySync property_sync = 4;
- // if client side is already send all the properties(missing or property
sync)
- // which means the client side will not sending more properties to sync,
server side should close the stream.
- NoMorePropertySync no_more_property_sync = 5;
+ // wait next differ tree summary for process
+ WaitNextDifferData wait_next_differ = 5;
}
}
@@ -96,7 +106,7 @@ message RepairResponse {
// repair stage
// case 1: return from PropertyMissing
// case 3: return if the client is older
- PropertySync property_sync = 3;
+ PropertySyncWithFrom property_sync = 3;
}
}
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 9fe4aa7e..cd4921c7 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -19,6 +19,7 @@ package grpc
import (
"context"
+ "fmt"
"math"
"sync"
"time"
@@ -280,15 +281,21 @@ func (ps *propertyServer) replaceProperty(ctx
context.Context, now time.Time, sh
Id: propertypkg.GetPropertyID(cur),
Property: cur,
}
+ var successCount int
futures := make([]bus.Future, 0, len(nodes))
for _, node := range nodes {
f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate,
bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req))
if err != nil {
- return nil, errors.Wrapf(err, "failed to publish
property update to node %s", node)
+ ps.log.Debug().Err(err).Str("node", node).Msg("failed
to publish property update")
+ continue
}
+ successCount++
futures = append(futures, f)
}
+ if successCount == 0 {
+ return nil, fmt.Errorf("failed to publish property update to
any node")
+ }
// Wait for all futures to complete, and which should last have one
success
haveSuccess := false
var lastestError error
diff --git a/banyand/property/gossip/client.go
b/banyand/property/gossip/client.go
index 2bf87ca5..5342e304 100644
--- a/banyand/property/gossip/client.go
+++ b/banyand/property/gossip/client.go
@@ -114,6 +114,7 @@ func (s *service) getRegisteredNode(id string)
(*databasev1.Node, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
node, exist := s.registered[id]
+ s.log.Debug().Str("node", id).Bool("exist",
exist).Int("register_count", len(s.registered)).Msg("get registered gossip
node")
return node, exist
}
@@ -121,6 +122,9 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
if s.traceStreamSelector != nil {
s.traceStreamSelector.(schema.EventHandler).OnAddOrUpdate(md)
}
+ if selEventHandler, ok := s.sel.(schema.EventHandler); ok {
+ selEventHandler.OnAddOrUpdate(md)
+ }
if md.Kind != schema.KindNode {
return
}
@@ -129,10 +133,6 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
s.log.Warn().Msg("invalid metadata type")
return
}
- s.sel.AddNode(node)
- if s.traceStreamSelector != nil {
- s.traceStreamSelector.AddNode(node)
- }
address := node.PropertyRepairGossipGrpcAddress
if address == "" {
s.log.Warn().Stringer("node", node).Msg("node does not have
gossip address, skipping registration")
@@ -143,6 +143,10 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
s.log.Warn().Stringer("node", node).Msg("node does not have a
name, skipping registration")
return
}
+ s.sel.AddNode(node)
+ if s.traceStreamSelector != nil {
+ s.traceStreamSelector.AddNode(node)
+ }
s.mu.Lock()
defer s.mu.Unlock()
@@ -155,6 +159,9 @@ func (s *service) OnDelete(md schema.Metadata) {
if s.traceStreamSelector != nil {
s.traceStreamSelector.(schema.EventHandler).OnDelete(md)
}
+ if selEventHandler, ok := s.sel.(schema.EventHandler); ok {
+ selEventHandler.OnDelete(md)
+ }
if md.Kind != schema.KindNode {
return
}
diff --git a/banyand/property/gossip/server.go
b/banyand/property/gossip/server.go
index aa5fc36c..689586d6 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -51,7 +51,7 @@ var (
}]}`, serviceName)
// perNodeSyncTimeout is the timeout for each node to sync the property
data.
- perNodeSyncTimeout = time.Minute * 10
+ perNodeSyncTimeout = time.Hour * 1
)
func (s *service) Subscribe(listener MessageListener) {
@@ -188,11 +188,15 @@ func (q *protocolHandler) handle(ctx context.Context,
request *handlingRequest)
if err != nil {
q.s.serverMetrics.totalSendToNextErr.Inc(1,
request.Group)
handlingSpan.Error(err.Error())
+ q.s.log.Warn().Err(err).Stringer("request", request).
+ Msgf("failed to handle gossip message for
propagation")
}
q.s.serverMetrics.totalFinished.Inc(1, request.Group)
q.s.serverMetrics.totalLatency.Inc(n.Sub(time.Unix(0,
now)).Seconds(), request.Group)
if !needsKeepPropagation {
-
q.s.serverMetrics.totalPropagationCount.Inc(float64(request.Context.CurrentPropagationCount),
+ q.s.log.Info().Str("group",
request.Group).Uint32("shardNum", request.ShardId).
+ Msgf("propagation message for propagation is
finished")
+ q.s.serverMetrics.totalPropagationCount.Inc(1,
request.Group, request.Context.OriginNode)
q.s.serverMetrics.totalPropagationPercent.Observe(
float64(request.Context.CurrentPropagationCount)/float64(request.Context.MaxPropagationCount),
request.Group)
@@ -388,6 +392,7 @@ func (s *service) newConnectionFromNode(n *databasev1.Node)
(*grpc.ClientConn, e
return nil, fmt.Errorf("failed to get client transport
credentials: %w", err)
}
conn, err := grpc.NewClient(n.PropertyRepairGossipGrpcAddress,
append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...)
+ s.log.Debug().Str("address",
n.PropertyRepairGossipGrpcAddress).Msg("starting to create gRPC client
connection to node")
if err != nil {
return nil, fmt.Errorf("failed to create gRPC client connection
to node %s: %w", n.PropertyRepairGossipGrpcAddress, err)
}
diff --git a/banyand/property/gossip/service.go
b/banyand/property/gossip/service.go
index 0f76fa4a..db44a65d 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -112,6 +112,7 @@ func NewMessenger(omr observability.MetricsRegistry,
metadata metadata.Repo, pip
registered: make(map[string]*databasev1.Node),
scheduleInterval: time.Hour * 2,
sel: node.NewRoundRobinSelector("", metadata),
+ port: 17932,
}
}
@@ -133,6 +134,7 @@ func (s *service) PreRun(ctx context.Context) error {
s.listeners = make([]MessageListener, 0)
s.serverMetrics = newServerMetrics(s.omr.With(serverScope))
if s.metadata != nil {
+ s.sel.OnInit([]schema.Kind{schema.KindGroup})
s.metadata.RegisterHandler("property-repair-nodes",
schema.KindNode, s)
s.metadata.RegisterHandler("property-repair-groups",
schema.KindGroup, s)
if err := s.initTracing(ctx); err != nil {
diff --git a/banyand/property/gossip/trace.go b/banyand/property/gossip/trace.go
index 1f18e132..6f5ed6da 100644
--- a/banyand/property/gossip/trace.go
+++ b/banyand/property/gossip/trace.go
@@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "sync"
"sync/atomic"
"time"
@@ -285,11 +286,12 @@ type recordTrace struct {
id string
allSpans []*recordTraceSpan
roundNum int
+ lock sync.Mutex
}
func (r *recordTrace) CreateSpan(parent Span, message string) Span {
spanID := fmt.Sprintf("%s_%d", r.s.nodeID, time.Now().UnixNano())
- r.request.TraceContext.ParentSpanId = spanID
+ r.changeParentID(spanID)
span := &recordTraceSpan{
trace: r,
id: spanID,
@@ -302,6 +304,12 @@ func (r *recordTrace) CreateSpan(parent Span, message
string) Span {
return span
}
+func (r *recordTrace) changeParentID(id string) {
+ r.lock.Lock()
+ defer r.lock.Unlock()
+ r.request.TraceContext.ParentSpanId = id
+}
+
func (r *recordTrace) ActivateSpan() Span {
return r.currentSpan
}
@@ -350,7 +358,7 @@ func (r *recordTraceSpan) End() {
// if still have parent span, then this is not the root span
// change the context to parent span
if r.parent != nil {
- r.trace.request.TraceContext.ParentSpanId = r.parent.ID()
+ r.trace.changeParentID(r.parent.ID())
}
}
diff --git a/banyand/property/repair.go b/banyand/property/repair.go
index cf174201..fb0b07a3 100644
--- a/banyand/property/repair.go
+++ b/banyand/property/repair.go
@@ -62,7 +62,6 @@ import (
const (
repairBatchSearchSize = 100
- repairFileNewLine = '\n'
)
type repair struct {
@@ -126,6 +125,7 @@ func (r *repair) checkHasUpdates() (bool, error) {
}
func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group
string) (err error) {
+ r.l.Debug().Msgf("starting building status from snapshot path %s,
group: %s", snapshotPath, group)
startTime := time.Now()
defer func() {
r.metrics.totalBuildTreeFinished.Inc(1)
@@ -449,20 +449,27 @@ func (r *repairTreeFileReader) seekPosition(offset int64,
whence int) error {
return nil
}
-func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64,
forceReFromStart bool) ([]*repairTreeNode, error) {
+func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64,
forceReFromStart bool) (nodes []*repairTreeNode, err error) {
+ defer func() {
+ recoverErr := recover()
+ if recoverErr != nil {
+ if recoverData, ok := recoverErr.(error); ok && err ==
nil {
+ err = fmt.Errorf("reading repair tree file %s
failure: %w", r.file.Name(), recoverData)
+ }
+ }
+ }()
if parent == nil {
// reading the root node
- err := r.seekPosition(r.footer.slotNodeFinishedOffset,
io.SeekStart)
- if err != nil {
+ if err = r.seekPosition(r.footer.slotNodeFinishedOffset,
io.SeekStart); err != nil {
return nil, fmt.Errorf("seeking to root node offset %d
in file %s failure: %w", r.footer.slotNodeFinishedOffset, r.file.Name(), err)
}
rootDataBytes := make([]byte, r.footer.rootNodeLen)
if _, err = io.ReadFull(r.reader, rootDataBytes); err != nil {
return nil, fmt.Errorf("reading root node data from
file %s failure: %w", r.file.Name(), err)
}
- _, shaValue, err := encoding.DecodeBytes(rootDataBytes)
- if err != nil {
- return nil, fmt.Errorf("decoding root node sha value
from file %s failure: %w", r.file.Name(), err)
+ _, shaValue, decodeErr := encoding.DecodeBytes(rootDataBytes)
+ if decodeErr != nil {
+ return nil, fmt.Errorf("decoding root node sha value
from file %s failure: %w", r.file.Name(), decodeErr)
}
return []*repairTreeNode{
{
@@ -472,54 +479,8 @@ func (r *repairTreeFileReader) read(parent
*repairTreeNode, pagingSize int64, fo
}, nil
}
- var err error
if parent.tp == repairTreeNodeTypeRoot {
- needSeek := false
- if r.paging == nil || r.paging.lastNode != parent ||
forceReFromStart {
- needSeek = true
- r.paging = newRepairTreeReaderPage(parent,
r.footer.slotNodeCount)
- }
- if needSeek {
- // reading the slot nodes
- if err =
r.seekPosition(r.footer.leafNodeFinishedOffset, io.SeekStart); err != nil {
- return nil, fmt.Errorf("seeking to slot node
offset %d in file %s failure: %w", r.footer.leafNodeFinishedOffset,
r.file.Name(), err)
- }
- }
- var slotNodeIndex, leafStartOff, leafCount int64
- var slotShaVal, slotDataBytes []byte
- count := r.paging.nextPage(pagingSize)
- nodes := make([]*repairTreeNode, 0, count)
- for i := int64(0); i < count; i++ {
- slotDataBytes, err =
r.reader.ReadBytes(repairFileNewLine)
- if err != nil {
- return nil, fmt.Errorf("reading slot node data
from file %s failure: %w", r.file.Name(), err)
- }
- slotDataBytes, slotNodeIndex, err =
encoding.BytesToVarInt64(slotDataBytes)
- if err != nil {
- return nil, fmt.Errorf("decoding slot node
index from file %s failure: %w", r.file.Name(), err)
- }
- slotDataBytes, slotShaVal, err =
encoding.DecodeBytes(slotDataBytes)
- if err != nil {
- return nil, fmt.Errorf("decoding slot node sha
value from file %s failure: %w", r.file.Name(), err)
- }
- slotDataBytes, leafStartOff, err =
encoding.BytesToVarInt64(slotDataBytes)
- if err != nil {
- return nil, fmt.Errorf("decoding slot node leaf
start offset from file %s failure: %w", r.file.Name(), err)
- }
- _, leafCount, err =
encoding.BytesToVarInt64(slotDataBytes)
- if err != nil {
- return nil, fmt.Errorf("decoding slot node leaf
length from file %s failure: %w", r.file.Name(), err)
- }
- nodes = append(nodes, &repairTreeNode{
- shaValue: string(slotShaVal),
- slotInx: int32(slotNodeIndex),
- tp: repairTreeNodeTypeSlot,
- leafStart: leafStartOff,
- leafCount: leafCount,
- })
- }
-
- return nodes, nil
+ return r.readSlots(parent, pagingSize, forceReFromStart)
} else if parent.tp == repairTreeNodeTypeLeaf {
return nil, nil
}
@@ -538,11 +499,16 @@ func (r *repairTreeFileReader) read(parent
*repairTreeNode, pagingSize int64, fo
}
var entity, shaVal []byte
count := r.paging.nextPage(pagingSize)
- nodes := make([]*repairTreeNode, 0, count)
for i := int64(0); i < count; i++ {
- leafDataBytes, err := r.reader.ReadBytes(repairFileNewLine)
+ var dataSize int64
+ err = binary.Read(r.reader, binary.LittleEndian, &dataSize)
if err != nil {
- return nil, fmt.Errorf("reading leaf node data from
file %s failure: %w", r.file.Name(), err)
+ return nil, fmt.Errorf("reading leaf node data size
from file %s failure: %w", r.file.Name(), err)
+ }
+ leafDataBytes := make([]byte, dataSize)
+ leafReadLen, err := io.ReadFull(r.reader, leafDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("reading leaf node data from
file %s failure(readed: %d): %w", r.file.Name(), leafReadLen, err)
}
leafDataBytes, entity, err = encoding.DecodeBytes(leafDataBytes)
if err != nil {
@@ -562,6 +528,61 @@ func (r *repairTreeFileReader) read(parent
*repairTreeNode, pagingSize int64, fo
return nodes, nil
}
+func (r *repairTreeFileReader) readSlots(parent *repairTreeNode, pagingSize
int64, forceReFromStart bool) (nodes []*repairTreeNode, err error) {
+ needSeek := false
+ if r.paging == nil || r.paging.lastNode != parent || forceReFromStart {
+ needSeek = true
+ r.paging = newRepairTreeReaderPage(parent,
r.footer.slotNodeCount)
+ }
+ if needSeek {
+ // reading the slot nodes
+ if err = r.seekPosition(r.footer.leafNodeFinishedOffset,
io.SeekStart); err != nil {
+ return nil, fmt.Errorf("seeking to slot node offset %d
in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+ }
+ }
+ var slotNodeIndex, leafStartOff, leafCount int64
+ var slotShaVal, slotDataBytes []byte
+ count := r.paging.nextPage(pagingSize)
+ for i := int64(0); i < count; i++ {
+ var dataSize int64
+ err = binary.Read(r.reader, binary.LittleEndian, &dataSize)
+ if err != nil {
+ return nil, fmt.Errorf("reading slot node data size
from file %s failure: %w", r.file.Name(), err)
+ }
+ slotDataBytes = make([]byte, dataSize)
+ var readSize int
+ readSize, err = io.ReadFull(r.reader, slotDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("reading slot node data from
file %s failure: %w, read %d bytes", r.file.Name(), err, readSize)
+ }
+ slotDataBytes, slotNodeIndex, err =
encoding.BytesToVarInt64(slotDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("decoding slot node index from
file %s failure: %w", r.file.Name(), err)
+ }
+ slotDataBytes, slotShaVal, err =
encoding.DecodeBytes(slotDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("decoding slot node sha value
from file %s failure: %w", r.file.Name(), err)
+ }
+ slotDataBytes, leafStartOff, err =
encoding.BytesToVarInt64(slotDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("decoding slot node leaf start
offset from file %s failure: %w", r.file.Name(), err)
+ }
+ _, leafCount, err = encoding.BytesToVarInt64(slotDataBytes)
+ if err != nil {
+ return nil, fmt.Errorf("decoding slot node leaf length
from file %s failure: %w", r.file.Name(), err)
+ }
+ nodes = append(nodes, &repairTreeNode{
+ shaValue: string(slotShaVal),
+ slotInx: int32(slotNodeIndex),
+ tp: repairTreeNodeTypeSlot,
+ leafStart: leafStartOff,
+ leafCount: leafCount,
+ })
+ }
+
+ return nodes, nil
+}
+
func (r *repairTreeFileReader) close() error {
return r.file.Close()
}
@@ -756,7 +777,10 @@ func (r *repairSlotFile) append(entity, shaValue []byte)
error {
result := make([]byte, 0)
result = encoding.EncodeBytes(result, entity)
result = encoding.EncodeBytes(result, shaValue)
- result = append(result, repairFileNewLine)
+ err = binary.Write(r.writer, binary.LittleEndian, int64(len(result)))
+ if err != nil {
+ return fmt.Errorf("writing entity and sha value length to
repair slot file %s failure: %w", r.path, err)
+ }
_, err = r.writer.Write(result)
if err != nil {
return fmt.Errorf("writing entity and sha value to repair slot
file %s failure: %w", r.path, err)
@@ -846,7 +870,15 @@ func (r *repairTreeBuilder) build() (err error) {
data = encoding.EncodeBytes(data, slot.shaVal)
data = encoding.VarInt64ToBytes(data, slot.startOff)
data = encoding.VarInt64ToBytes(data, slot.leafCount)
- data = append(data, repairFileNewLine)
+
+ slotTotalLen := int64(len(data))
+ err = binary.Write(r.writer, binary.LittleEndian, slotTotalLen)
+ if err != nil {
+ return fmt.Errorf("writing slot node to repair tree
file failure: %w", err)
+ }
+ slotNodesLen += 8
+ slotNodesFinishedOffset += 8
+
writedLen, err = r.writer.Write(data)
if err != nil {
return fmt.Errorf("writing slot node to repair tree
file failure: %w", err)
@@ -932,18 +964,20 @@ type repairScheduler struct {
latestBuildTreeSchedule time.Time
buildTreeClock clock.Clock
gossipMessenger gossip.Messenger
- closer *run.Closer
- buildSnapshotFunc func(context.Context) (string, error)
+ l *logger.Logger
+ gossipRepairing *int32
repairTreeScheduler *timestamp.Scheduler
quickRepairNotified *int32
db *database
- l *logger.Logger
+ closer *run.Closer
metrics *repairSchedulerMetrics
- treeSlotCount int
+ buildSnapshotFunc func(context.Context) (string, error)
+ scheduleBasicFile string
buildTreeScheduleInterval time.Duration
quickBuildTreeTime time.Duration
- lastBuildTimeLocker sync.Mutex
+ treeSlotCount int
treeLocker sync.RWMutex
+ lastBuildTimeLocker sync.Mutex
}
// nolint: contextcheck
@@ -970,6 +1004,8 @@ func newRepairScheduler(
metrics:
newRepairSchedulerMetrics(omr.With(propertyScope.SubScope("scheduler"))),
gossipMessenger: gossipMessenger,
treeSlotCount: treeSlotCount,
+ scheduleBasicFile: filepath.Join(db.repairBaseDir,
"scheduled.json"),
+ gossipRepairing: new(int32),
}
c := timestamp.NewScheduler(l, s.buildTreeClock)
s.repairTreeScheduler = c
@@ -982,10 +1018,13 @@ func newRepairScheduler(
}
err = c.Register("trigger",
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
triggerCronExp, func(time.Time, *logger.Logger) bool {
- gossipErr := s.doRepairGossip(s.closer.Ctx())
+ l.Debug().Msgf("starting background repair gossip")
+ group, shardNum, nodes, gossipErr :=
s.doRepairGossip(s.closer.Ctx())
if gossipErr != nil {
s.l.Err(gossipErr).Msg("failed to repair
gossip")
+ return true
}
+ s.l.Info().Str("group", group).Uint32("shardNum",
shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled")
return true
})
if err != nil {
@@ -998,6 +1037,14 @@ func newRepairScheduler(
return s, nil
}
+func (r *repairScheduler) setGossipRepairing(repairing bool) {
+ val := int32(0)
+ if repairing {
+ val = 1
+ }
+ atomic.StoreInt32(r.gossipRepairing, val)
+}
+
func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron
bool) {
if !r.verifyShouldExecuteBuildTree(t, triggerByCron) {
return
@@ -1012,6 +1059,12 @@ func (r *repairScheduler) doBuildTreeScheduler(t
time.Time, triggerByCron bool)
func (r *repairScheduler) verifyShouldExecuteBuildTree(t time.Time,
triggerByCron bool) bool {
r.lastBuildTimeLocker.Lock()
defer r.lastBuildTimeLocker.Unlock()
+ // ignore the build tree if the gossip repairing is in progress
+ if atomic.LoadInt32(r.gossipRepairing) == 1 {
+ r.l.Debug().Msg("gossip repairing is in progress, skipping
build tree")
+ return false
+ }
+
if !triggerByCron {
// if not triggered by cron, we need to check if the time is
after the (last scheduled time + half of the interval)
if
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
/ 2)) {
@@ -1035,6 +1088,33 @@ func (r *repairScheduler) initializeInterval() error {
return nil
}
+func (r *repairScheduler) saveHasBuildTree() error {
+ // save the latest build tree time to the database
+ now := time.Now()
+ data := make(map[string]string, 1)
+ data["lastBuildTreeTime"] = now.Format(time.RFC3339)
+ json, err := json.Marshal(data)
+ if err != nil {
+ return err
+ }
+ err = os.MkdirAll(path.Dir(r.scheduleBasicFile), storage.FilePerm)
+ if err != nil {
+ return fmt.Errorf("creating directory for repair build tree
file %s failure: %w", r.scheduleBasicFile, err)
+ }
+ return os.WriteFile(r.scheduleBasicFile, json, storage.FilePerm)
+}
+
+func (r *repairScheduler) checkHasBuildTree() (bool, error) {
+ _, err := os.Stat(r.scheduleBasicFile)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return false, nil // no build tree file, means no build
tree has been done
+ }
+ return false, fmt.Errorf("checking repair build tree file
existence failure: %w", err)
+ }
+ return true, nil
+}
+
//nolint:contextcheck
func (r *repairScheduler) doBuildTree() (err error) {
now := time.Now()
@@ -1043,8 +1123,14 @@ func (r *repairScheduler) doBuildTree() (err error) {
r.metrics.totalRepairBuildTreeFinished.Inc(1)
r.metrics.totalRepairBuildTreeLatency.Inc(time.Since(now).Seconds())
if err != nil {
+ r.l.Err(err).Msg("repair build tree failed")
r.metrics.totalRepairBuildTreeFailures.Inc(1)
}
+
+ saveStatusErr := r.saveHasBuildTree()
+ if saveStatusErr != nil {
+ r.l.Err(saveStatusErr).Msgf("saving repair build tree
status failure")
+ }
}()
sLst := r.db.sLst.Load()
if sLst == nil {
@@ -1140,17 +1226,17 @@ func (r *repairScheduler) close() {
r.closer.CloseThenWait()
}
-func (r *repairScheduler) doRepairGossip(ctx context.Context) error {
+func (r *repairScheduler) doRepairGossip(ctx context.Context) (string, uint32,
[]string, error) {
group, shardNum, err := r.randomSelectGroup(ctx)
if err != nil {
- return fmt.Errorf("selecting random group failure: %w", err)
+ return "", 0, nil, fmt.Errorf("selecting random group failure:
%w", err)
}
nodes, err := r.gossipMessenger.LocateNodes(group.Metadata.Name,
shardNum, uint32(r.copiesCount(group)))
if err != nil {
- return fmt.Errorf("locating nodes for group %s, shard %d
failure: %w", group.Metadata.Name, shardNum, err)
+ return "", 0, nil, fmt.Errorf("locating nodes for group %s,
shard %d failure: %w", group.Metadata.Name, shardNum, err)
}
- return r.gossipMessenger.Propagation(nodes, group.Metadata.Name,
shardNum)
+ return group.Metadata.Name, shardNum, nodes,
r.gossipMessenger.Propagation(nodes, group.Metadata.Name, shardNum)
}
func (r *repairScheduler) randomSelectGroup(ctx context.Context)
(*commonv1.Group, uint32, error) {
diff --git a/banyand/property/repair_gossip.go
b/banyand/property/repair_gossip.go
index 4468c300..5babb981 100644
--- a/banyand/property/repair_gossip.go
+++ b/banyand/property/repair_gossip.go
@@ -57,6 +57,13 @@ func (b *repairGossipBase) getTreeReader(ctx
context.Context, group string, shar
if err != nil {
return nil, false, fmt.Errorf("failed to check state
file existence for group %s: %w", group, err)
}
+ if !stateExist {
+ // check has scheduled or not
+ stateExist, err = b.scheduler.checkHasBuildTree()
+ if err != nil {
+ return nil, false, fmt.Errorf("failed to check
if the tree state file exists: %w", err)
+ }
+ }
// if the tree is nil, it means the tree is no data
return &emptyRepairTreeReader{}, stateExist, nil
}
@@ -198,7 +205,9 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
startSyncSpan.Tag(gossip.TraceTagTargetNode, nextNode.Target())
client := propertyv1.NewRepairServiceClient(nextNode)
var hasPropertyUpdated bool
+ r.scheduler.setGossipRepairing(true)
defer func() {
+ r.scheduler.setGossipRepairing(false)
if err != nil {
startSyncSpan.Tag("has_property_updates",
fmt.Sprintf("%t", hasPropertyUpdated))
startSyncSpan.Error(err.Error())
@@ -243,6 +252,7 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
sendTreeSummarySpan.End()
// if the root node matched, then ignore the repair
if rootMatch {
+ r.scheduler.l.Debug().Msgf("tree root for group %s, shard %d
matched, no need to repair", request.Group, request.ShardId)
return nil
}
@@ -259,6 +269,7 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
recvResp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
+ r.scheduler.l.Debug().Msgf("no more messages
from server, client side finished syncing properties for group %s, shard %d",
request.Group, request.ShardId)
return nil
}
return fmt.Errorf("failed to keep receive tree summary
from server: %w", err)
@@ -274,27 +285,38 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
differSpan.End()
return nil
}
+ r.scheduler.l.Debug().Msgf("received differ tree
summary from server, nodes count: %d", len(resp.DifferTreeSummary.Nodes))
r.handleDifferSummaryFromServer(ctx, stream,
resp.DifferTreeSummary, reader, syncShard, rootNode, leafReader,
¬ProcessingClientNode, ¤tComparingClientNode)
firstTreeSummaryResp = false
+
+ if err = stream.Send(&propertyv1.RepairRequest{
+ Data: &propertyv1.RepairRequest_WaitNextDiffer{
+ WaitNextDiffer:
&propertyv1.WaitNextDifferData{},
+ },
+ }); err != nil {
+ differSpan.Error(err.Error())
+ r.scheduler.l.Warn().Err(err).Msgf("failed to
send wait next differ request to server, group: %s, shard: %d", request.Group,
request.ShardId)
+ }
differSpan.End()
case *propertyv1.RepairResponse_PropertySync:
+ r.scheduler.l.Debug().Msgf("received repair response
from server")
// step 3: keep receiving messages from the server
// if the server still sending different nodes, we
should keep reading them
// if the server sends a PropertySync, we should repair
the property and send the newer property back to the server if needed
sync := resp.PropertySync
syncSpan := tracer.CreateSpan(startSyncSpan, "repair
property")
syncSpan.Tag(gossip.TraceTagOperateType,
gossip.TraceTagOperateRepairProperty)
- syncSpan.Tag(gossip.TraceTagPropertyID, string(sync.Id))
- updated, newer, err := syncShard.repair(ctx, sync.Id,
sync.Property, sync.DeleteTime)
+ syncSpan.Tag(gossip.TraceTagPropertyID,
string(sync.Property.Id))
+ updated, newer, err := syncShard.repair(ctx,
sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime)
syncSpan.Tag("updated", fmt.Sprintf("%t", updated))
syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer !=
nil))
if err != nil {
syncSpan.Error(err.Error())
- r.scheduler.l.Warn().Err(err).Msgf("failed to
repair property %s", sync.Id)
+ r.scheduler.l.Warn().Err(err).Msgf("failed to
repair property %s", sync.Property.Id)
r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group,
fmt.Sprintf("%d", request.ShardId))
}
if updated {
- r.scheduler.l.Debug().Msgf("successfully
repaired property %s on client side", sync.Id)
+ r.scheduler.l.Debug().Msgf("successfully
repaired property %s on client side", sync.Property.Id)
r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group,
fmt.Sprintf("%d", request.ShardId))
hasPropertyUpdated = true
syncSpan.End()
@@ -302,7 +324,10 @@ func (r *repairGossipClient) Rev(ctx context.Context,
tracer gossip.Trace, nextN
}
// if the property hasn't been updated, and the newer
property is not nil,
// which means the property is newer than the server
side,
- if !updated && newer != nil {
+
+ // if the sync.From is PROPERTY_MISSING, it means the
client doesn't have the property, but the server side has,
+ // but the current client has the newer property, it's
meaning the client has updated, so no need to send the property back to the
server
+ if !updated && newer != nil && sync.From !=
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING {
var p propertyv1.Property
err = protojson.Unmarshal(newer.source, &p)
if err != nil {
@@ -371,14 +396,7 @@ func (r *repairGossipClient) handleDifferSummaryFromServer(
// then queried and sent property to server
r.queryPropertyAndSendToServer(ctx, syncShard,
(*notProcessingClientNode).entity, stream)
}
- err := stream.Send(&propertyv1.RepairRequest{
- Data: &propertyv1.RepairRequest_NoMorePropertySync{
- NoMorePropertySync:
&propertyv1.NoMorePropertySync{},
- },
- })
- if err != nil {
- r.scheduler.l.Warn().Err(err).Msgf("failed to send no
more property sync request to server")
- }
+ r.scheduler.l.Debug().Msg("no more property sync request sent
to server")
return
}
@@ -568,7 +586,11 @@ func newRepairGossipServer(s *repairScheduler)
*repairGossipServer {
}
}
-func (r *repairGossipServer) Repair(s
grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse]) error {
+func (r *repairGossipServer) Repair(s
grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse]) (err error) {
+ r.scheduler.setGossipRepairing(true)
+ defer func() {
+ r.scheduler.setGossipRepairing(false)
+ }()
summary, reader, err := r.combineTreeSummary(s)
if err != nil {
return fmt.Errorf("failed to receive tree summary request: %w",
err)
@@ -583,6 +605,9 @@ func (r *repairGossipServer) Repair(s
grpclib.BidiStreamingServer[propertyv1.Rep
shardID := summary.shardID
var hasPropertyUpdated bool
defer func() {
+ if err != nil {
+ r.scheduler.l.Warn().Err(err).Msgf("server failed to
repair gossip for group %s, shard %d", group, shardID)
+ }
if hasPropertyUpdated {
err =
r.scheduler.buildingTree([]common.ShardID{common.ShardID(shardID)}, group, true)
if err != nil {
@@ -621,42 +646,10 @@ func (r *repairGossipServer) Repair(s
grpclib.BidiStreamingServer[propertyv1.Rep
serverMissingSlots = append(serverMissingSlots,
clientSlot.index)
}
}
- sent, err := r.sendDifferSlots(reader, clientMismatchSlots,
serverMissingSlots, s)
- if err != nil {
- r.scheduler.l.Warn().Err(err).Msgf("failed to send different
slots to client")
- }
- // send the tree and no more different slots needs to be sent
- err = r.sendEmptyDiffer(s)
- if !sent {
- return err
- } else if err != nil {
- // should keep the message receiving loop
- r.scheduler.l.Warn().Msgf("sent no difference slot to client
failure, error: %v", err)
- }
- for {
- missingOrSyncRequest, err := s.Recv()
- if err != nil {
- if errors.Is(err, io.EOF) {
- return nil
- }
- return fmt.Errorf("failed to receive missing or sync
request: %w", err)
- }
- syncShard, err := r.scheduler.db.loadShard(s.Context(),
common.ShardID(shardID))
- if err != nil {
- return fmt.Errorf("shard %d load failure on server
side: %w", shardID, err)
- }
- switch req := missingOrSyncRequest.Data.(type) {
- case *propertyv1.RepairRequest_PropertyMissing:
- r.processPropertyMissing(s.Context(), syncShard,
req.PropertyMissing, s)
- case *propertyv1.RepairRequest_PropertySync:
- if r.processPropertySync(s.Context(), syncShard,
req.PropertySync, s, group) {
- hasPropertyUpdated = true
- }
- case *propertyv1.RepairRequest_NoMorePropertySync:
- // if the client has no more property sync, the server
side should stop the sync
- return nil
- }
- }
+
+ // send differ slots to the client and wait for the client process
+ r.sendDifferSlots(reader, clientMismatchSlots, serverMissingSlots,
group, shardID, &hasPropertyUpdated, s)
+ return nil
}
func (r *repairGossipServer) combineTreeSummary(
@@ -770,13 +763,17 @@ func (r *repairGossipServer) processPropertySync(
// send the newer property to the client
err = s.Send(&propertyv1.RepairResponse{
Data: &propertyv1.RepairResponse_PropertySync{
- PropertySync: &propertyv1.PropertySync{
- Id: newer.id,
- Property: &p,
- DeleteTime: newer.deleteTime,
+ PropertySync: &propertyv1.PropertySyncWithFrom{
+ From:
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_SYNC,
+ Property: &propertyv1.PropertySync{
+ Id: newer.id,
+ Property: &p,
+ DeleteTime: newer.deleteTime,
+ },
},
},
})
+ r.scheduler.l.Debug().Msgf("sending repaired property %s on
server side", sync.Id)
if err != nil {
r.scheduler.l.Warn().Err(err).Msgf("failed to send
newer property sync response to client, entity: %s", newer.id)
return false
@@ -801,13 +798,17 @@ func (r *repairGossipServer) processPropertyMissing(
}
err = s.Send(&propertyv1.RepairResponse{
Data: &propertyv1.RepairResponse_PropertySync{
- PropertySync: &propertyv1.PropertySync{
- Id: property.id,
- Property: data,
- DeleteTime: property.deleteTime,
+ PropertySync: &propertyv1.PropertySyncWithFrom{
+ From:
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING,
+ Property: &propertyv1.PropertySync{
+ Id: property.id,
+ Property: data,
+ DeleteTime: property.deleteTime,
+ },
},
},
})
+ r.scheduler.l.Debug().Msgf("sending missing property on server side:
%s", property.id)
if err != nil {
r.scheduler.l.Warn().Err(err).Msgf("failed to send property
sync response to client, entity: %s", missing.Entity)
return
@@ -818,16 +819,21 @@ func (r *repairGossipServer) sendDifferSlots(
reader repairTreeReader,
clientMismatchSlots []*repairTreeNode,
serverMissingSlots []int32,
+ group string,
+ shardID uint32,
+ hasPropertyUpdated *bool,
s grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse],
-) (hasSent bool, err error) {
+) {
var leafNodes []*repairTreeNode
+ var err error
// send server mismatch slots to the client
for _, node := range clientMismatchSlots {
for {
leafNodes, err = reader.read(node,
gossipMerkleTreeReadPageSize, false)
if err != nil {
- return hasSent, fmt.Errorf("failed to read leaf
nodes for slot %d: %w", node.slotInx, err)
+ r.scheduler.l.Warn().Err(err).Msgf("failed to
read leaf nodes for slot %d", node.slotInx)
+ continue
}
// if there are no more leaf nodes, we can skip this
slot
if len(leafNodes) == 0 {
@@ -853,8 +859,12 @@ func (r *repairGossipServer) sendDifferSlots(
if err != nil {
r.scheduler.l.Warn().Err(err).
Msgf("failed to send leaf nodes for
slot %d", node.slotInx)
- } else {
- hasSent = true
+ continue
+ }
+
+ if err = r.recvMsgAndWaitReadNextDiffer(s, group,
shardID, hasPropertyUpdated); err != nil {
+ r.scheduler.l.Warn().Err(err).Msgf("failed to
waiting the client side process differ finished")
+ return
}
}
}
@@ -879,11 +889,56 @@ func (r *repairGossipServer) sendDifferSlots(
if err != nil {
r.scheduler.l.Warn().Err(err).
Msgf("failed to send missing slots")
- } else {
- hasSent = true
+ return
+ }
+ r.scheduler.l.Debug().Msg("successfully sent differ-tree
summary to client")
+ if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID,
hasPropertyUpdated); err != nil {
+ r.scheduler.l.Warn().Err(err).Msgf("failed to waiting
the client side process differ finished")
+ return
+ }
+ }
+
+ // send the empty differ response to the client to make sure the client
side finished processing
+ if err = r.sendEmptyDiffer(s); err != nil {
+ r.scheduler.l.Warn().Err(err).Msgf("failed to send empty differ
response to client")
+ return
+ }
+ if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID,
hasPropertyUpdated); err != nil {
+ r.scheduler.l.Warn().Err(err).Msgf("failed to waiting the
client side process empty differ finished")
+ return
+ }
+}
+
+func (r *repairGossipServer) recvMsgAndWaitReadNextDiffer(
+ s grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse],
+ group string,
+ shardID uint32,
+ hasPropertyUpdated *bool,
+) error {
+ for {
+ missingOrSyncRequest, err := s.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ r.scheduler.l.Debug().Msgf("client closed the
stream, no more missing or sync request")
+ return nil
+ }
+ return fmt.Errorf("failed to receive missing or sync
request: %w", err)
+ }
+ syncShard, err := r.scheduler.db.loadShard(s.Context(),
common.ShardID(shardID))
+ if err != nil {
+ return fmt.Errorf("shard %d load failure on server
side: %w", shardID, err)
+ }
+ switch req := missingOrSyncRequest.Data.(type) {
+ case *propertyv1.RepairRequest_PropertyMissing:
+ r.processPropertyMissing(s.Context(), syncShard,
req.PropertyMissing, s)
+ case *propertyv1.RepairRequest_PropertySync:
+ if r.processPropertySync(s.Context(), syncShard,
req.PropertySync, s, group) {
+ *hasPropertyUpdated = true
+ }
+ case *propertyv1.RepairRequest_WaitNextDiffer:
+ return nil
}
}
- return hasSent, nil
}
func (r *repairGossipServer) sendEmptyDiffer(s
grpclib.BidiStreamingServer[propertyv1.RepairRequest,
propertyv1.RepairResponse]) error {
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index cb773acb..16675539 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -701,6 +701,220 @@ projection:
})
})
+var _ = Describe("Property Cluster Resilience with 5 Data Nodes", func() {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ var nodeIDs []string
+ var nodeRepairAddrs []string
+ var nodeDirs []string
+ var closeNodes []func()
+ var messenger gossip.Messenger
+ var server embeddedetcd.Server
+ var ep string
+ nodeCount := 5
+ closedNodeCount := 3
+
+ BeforeEach(func() {
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ var ports []int
+ var err error
+ var spaceDefs []func()
+
+ // Create 5 data nodes
+ nodeIDs = make([]string, nodeCount)
+ nodeRepairAddrs = make([]string, nodeCount)
+ nodeDirs = make([]string, nodeCount)
+ closeNodes = make([]func(), nodeCount)
+ spaceDefs = make([]func(), nodeCount)
+
+ // Create data directories for 5 nodes
+ for i := 0; i < nodeCount; i++ {
+ nodeDirs[i], spaceDefs[i], err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ }
+
+ // Setup cluster with etcd server
+ By("Starting etcd server")
+ ports, err = test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep = fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err = embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+
+ // Start 5 data nodes
+ for i := 0; i < nodeCount; i++ {
+ By(fmt.Sprintf("Starting data node %d", i))
+ nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+ "--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s")
+ // Update node ID to use 127.0.0.1
+ _, nodePort, found := strings.Cut(nodeIDs[i], ":")
+ Expect(found).To(BeTrue())
+ nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort)
+ }
+
+ By("Starting liaison node")
+ _, liaisonHTTPAddr, closerLiaisonNode :=
setup.LiaisonNodeWithHTTP(ep)
+ addr = httpSchema + liaisonHTTPAddr
+
+ By("Creating test group with shard=1, copies=5")
+ defUITemplateWithSchema(rootCmd, addr, 1, nodeCount)
+
+ // Setup gossip messenger
+ messenger =
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+ messenger.Validate()
+ err = messenger.PreRun(context.WithValue(context.Background(),
common.ContextNodeKey, common.Node{
+ NodeID: "test-client",
+ }))
+ Expect(err).NotTo(HaveOccurred())
+
+ for i := 0; i < nodeCount; i++ {
+ registerNodeToMessenger(messenger, nodeIDs[i],
nodeRepairAddrs[i])
+ }
+
+ deferFunc = func() {
+ messenger.GracefulStop()
+ closerLiaisonNode()
+ for i := 0; i < nodeCount; i++ {
+ if closeNodes[i] != nil {
+ closeNodes[i]()
+ }
+ }
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ for i := 0; i < nodeCount; i++ {
+ spaceDefs[i]()
+ }
+ }
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+
+ It("should handle node failures and repairs correctly", func() {
+ By("Writing initial test data")
+ beforeFirstWrite := time.Now()
+ applyData(rootCmd, addr, p1YAML, true, propertyTagCount)
+
+ By("Verifying data can be queried initially")
+ queryData(rootCmd, addr, propertyGroup, property1ID, 1,
func(data string, resp *propertyv1.QueryResponse) {
+ Expect(data).To(ContainSubstring("foo111"))
+ })
+
+ By("Verifying repair tree regeneration after first write")
+ waitForRepairTreeRegeneration(nodeDirs, propertyGroup,
beforeFirstWrite)
+
+ By(fmt.Sprintf("Closing %d nodes", closedNodeCount))
+ for i := 0; i < closedNodeCount; i++ {
+ GinkgoWriter.Printf("Closing node %d\n", i)
+ closeNodes[i]()
+ closeNodes[i] = nil
+ }
+
+ By(fmt.Sprintf("Verifying data can still be queried after
closing %d nodes", closedNodeCount))
+ queryData(rootCmd, addr, propertyGroup, property1ID, 1,
func(data string, resp *propertyv1.QueryResponse) {
+ Expect(data).To(ContainSubstring("foo111"))
+ })
+
+ By(fmt.Sprintf("Writing new test data with %d nodes down",
closedNodeCount))
+ beforeSecondWrite := time.Now()
+ applyData(rootCmd, addr, p3YAML, true, propertyTagCount)
+
+ By("Verifying new data can be queried")
+ queryData(rootCmd, addr, propertyGroup, property2ID, 1,
func(data string, resp *propertyv1.QueryResponse) {
+ Expect(data).To(ContainSubstring("foo-mesh"))
+ })
+
+ By("Verifying repair tree regeneration on remaining nodes after
second write")
+
waitForRepairTreeRegeneration(nodeDirs[closedNodeCount:nodeCount],
propertyGroup, beforeSecondWrite)
+
+ By(fmt.Sprintf("Restarting the %d closed nodes with existing
data directories", closedNodeCount))
+ for i := 0; i < closedNodeCount; i++ {
+ GinkgoWriter.Printf("Restarting node %d\n", i)
+ nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+ "--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s")
+ // Update node ID to use 127.0.0.1
+ _, nodePort, found := strings.Cut(nodeIDs[i], ":")
+ Expect(found).To(BeTrue())
+ nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort)
+ // Re-register to messenger
+ registerNodeToMessenger(messenger, nodeIDs[i],
nodeRepairAddrs[i])
+ }
+
+ By("Triggering repair operations")
+ err := messenger.Propagation(nodeIDs, propertyGroup, 0)
+ Expect(err).NotTo(HaveOccurred())
+
+ By("Verifying repair tree regeneration after repair operations")
+ waitForRepairTreeRegeneration(nodeDirs, propertyGroup,
beforeSecondWrite)
+
+ By("Closing all nodes before checking data consistency")
+ for i := 0; i < nodeCount; i++ {
+ if closeNodes[i] != nil {
+ GinkgoWriter.Printf("Closing node %d for data
consistency check\n", i)
+ closeNodes[i]()
+ closeNodes[i] = nil
+ }
+ }
+
+ By("Verifying data consistency across all nodes after repair")
+ Eventually(func() bool {
+ allNodesConsistent := true
+ for i := 0; i < nodeCount; i++ {
+ store, err := generateInvertedStore(nodeDirs[i])
+ if err != nil {
+ GinkgoWriter.Printf("Node %d store
error: %v\n", i, err)
+ allNodesConsistent = false
+ continue
+ }
+ query, err :=
inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+ Groups: []string{propertyGroup},
+ }, "_group", "_entity_id")
+ if err != nil {
+ GinkgoWriter.Printf("Node %d query
build error: %v\n", i, err)
+ allNodesConsistent = false
+ continue
+ }
+ searchResult, err :=
store.Search(context.Background(), []index.FieldKey{sourceFieldKey,
deletedFieldKey}, query, 10)
+ if err != nil {
+ GinkgoWriter.Printf("Node %d search
error: %v\n", i, err)
+ allNodesConsistent = false
+ continue
+ }
+
+ // Filter non-deleted properties
+ nonDeletedCount := 0
+ for _, result := range searchResult {
+ deleted :=
convert.BytesToBool(result.Fields[deletedFieldKey.TagName])
+ if !deleted {
+ nonDeletedCount++
+ }
+ }
+
+ GinkgoWriter.Printf("Node %d has %d total
properties, %d non-deleted\n", i, len(searchResult), nonDeletedCount)
+ if nonDeletedCount < 2 {
+ allNodesConsistent = false
+ }
+ }
+ return allNodesConsistent
+ }, flags.EventuallyTimeout).Should(BeTrue())
+ })
+})
+
func filterProperties(doc []index.SeriesDocument, filter func(property
*propertyv1.Property, deleted bool) bool) (res []*propertyv1.Property) {
for _, p := range doc {
deleted :=
convert.BytesToBool(p.Fields[deletedFieldKey.TagName])
@@ -867,3 +1081,33 @@ func registerNodeToMessenger(m gossip.Messenger, nodeID,
gossipRepairAddr string
},
})
}
+
+func getRepairTreeFilePath(nodeDir, group string) string {
+ return path.Join(nodeDir, "property", "repairs", "shard0",
fmt.Sprintf("state-tree-%s.data", group))
+}
+
+func getRepairTreeModTime(nodeDir, group string) (time.Time, error) {
+ filePath := getRepairTreeFilePath(nodeDir, group)
+ info, err := os.Stat(filePath)
+ if err != nil {
+ return time.Time{}, err
+ }
+ return info.ModTime(), nil
+}
+
+func waitForRepairTreeRegeneration(nodeDirs []string, group string, beforeTime
time.Time) {
+ Eventually(func() bool {
+ allRegenerated := true
+ for _, nodeDir := range nodeDirs {
+ modTime, err := getRepairTreeModTime(nodeDir, group)
+ if err != nil {
+ allRegenerated = false
+ continue
+ }
+ if !modTime.After(beforeTime) {
+ allRegenerated = false
+ }
+ }
+ return allRegenerated
+ }, flags.EventuallyTimeout).Should(BeTrue(), "All nodes should
regenerate repair tree after data write")
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2fb6bbd0..e7d778cc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -258,9 +258,9 @@
- [banyandb/property/v1/repair.proto](#banyandb_property_v1_repair-proto)
- [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary)
- - [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync)
- [PropertyMissing](#banyandb-property-v1-PropertyMissing)
- [PropertySync](#banyandb-property-v1-PropertySync)
+ - [PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom)
- [RepairRequest](#banyandb-property-v1-RepairRequest)
- [RepairResponse](#banyandb-property-v1-RepairResponse)
- [RootCompare](#banyandb-property-v1-RootCompare)
@@ -268,6 +268,9 @@
- [TreeRoot](#banyandb-property-v1-TreeRoot)
- [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA)
- [TreeSlots](#banyandb-property-v1-TreeSlots)
+ - [WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData)
+
+ - [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType)
- [RepairService](#banyandb-property-v1-RepairService)
@@ -3897,16 +3900,6 @@ Property stores the user defined data
-<a name="banyandb-property-v1-NoMorePropertySync"></a>
-
-### NoMorePropertySync
-
-
-
-
-
-
-
<a name="banyandb-property-v1-PropertyMissing"></a>
### PropertyMissing
@@ -3939,6 +3932,22 @@ Property stores the user defined data
+<a name="banyandb-property-v1-PropertySyncWithFrom"></a>
+
+### PropertySyncWithFrom
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| from | [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType) |
| |
+| property | [PropertySync](#banyandb-property-v1-PropertySync) | | |
+
+
+
+
+
+
<a name="banyandb-property-v1-RepairRequest"></a>
### RepairRequest
@@ -3951,7 +3960,7 @@ Property stores the user defined data
| tree_slots | [TreeSlots](#banyandb-property-v1-TreeSlots) | | |
| property_missing | [PropertyMissing](#banyandb-property-v1-PropertyMissing)
| | repair stage case 1: client missing but server existing |
| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | | case
2: client existing but server missing case 3: SHA value mismatches |
-| no_more_property_sync |
[NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) | | if client
side is already send all the properties(missing or property sync) which means
the client side will not sending more properties to sync, server side should
close the stream. |
+| wait_next_differ |
[WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData) | | wait next
differ tree summary for process |
@@ -3968,7 +3977,7 @@ Property stores the user defined data
| ----- | ---- | ----- | ----------- |
| root_compare | [RootCompare](#banyandb-property-v1-RootCompare) | | compare
stage |
| differ_tree_summary |
[DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) | | |
-| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | |
repair stage case 1: return from PropertyMissing case 3: return if the client
is older |
+| property_sync |
[PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom) | | repair
stage case 1: return from PropertyMissing case 3: return if the client is older
|
@@ -4056,8 +4065,31 @@ Property stores the user defined data
+
+<a name="banyandb-property-v1-WaitNextDifferData"></a>
+
+### WaitNextDifferData
+
+
+
+
+
+
+
+<a name="banyandb-property-v1-PropertySyncFromType"></a>
+
+### PropertySyncFromType
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED | 0 | |
+| PROPERTY_SYNC_FROM_TYPE_MISSING | 1 | client missing but server existing |
+| PROPERTY_SYNC_FROM_TYPE_SYNC | 2 | client existing but server missing or SHA
value mismatches |
+
+
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 0488a9a7..d583fc0f 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -47,6 +47,7 @@ services:
- 17912
- 2121
- 6060
+ - 17932
command: data --etcd-endpoints=http://etcd:2379
healthcheck:
test: ["CMD", "./bydbctl", "health", "--addr=http://127.0.0.1:17913"]
diff --git a/test/property_repair/README.md b/test/property_repair/README.md
new file mode 100644
index 00000000..56415d43
--- /dev/null
+++ b/test/property_repair/README.md
@@ -0,0 +1,142 @@
+# Property Repair Benchmark Test
+
+## Purpose
+
+This performance evaluation is designed to test the execution efficiency of
the backend’s automated Property repair feature.
+It primarily covers the following key features:
+1. The cluster is configured with a maximum of three nodes, one group, one
shard, and two replicas.
+2. Each shard contains **100,000** records, with each record approximately
**2KB** in size.
+
+## Requirements
+
+- Docker and Docker Compose
+- Go 1.21+
+
+### Building the BanyanDB Docker Image
+
+Please make sure you have the latest version of the BanyanDB codebase, and
building the Docker image is essential before running the tests.
+
+```bash
+export TARGET_OS=linux
+export PLATFORMS=linux/arm64 # please replace to your platform
+make clean && make generate && make release && make docker.build
+```
+
+## Monitoring
+
+The performance evaluation is primarily conducted by observing logs and
monitoring metrics in Prometheus.
+
+The logs provide clear markers for the start and end times of the backend
repair process.
+In Prometheus, by visiting `http://localhost:9090`, you can view system
performance metrics for each machine in the cluster.
+
+### CPU Usage Monitoring
+
+Use this PromQL query to monitor CPU usage during property repair:
+
+```promql
+avg by (instance) (
+ rate(process_cpu_seconds_total[1m]) * 100
+)
+```
+
+## Case 1: Fully Data Property Repair
+
+In the first test case, a brand-new, empty node will be started,
+and **100,000** records will be synchronized to it in a single batch.
+This test is designed to measure the node's CPU usage and the total time
consumed during the process.
+
+### Running the Integrated Test
+
+The full data test case runs as an integrated test that handles all steps
automatically:
+
+```bash
+cd test/property_repair/full_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 1 replica and loads 100,000 properties
+3. Updates the group to 2 replicas to trigger property repair
+4. Monitors the repair process through Prometheus metrics
+5. Verifies both propagation count and repair success count increase
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following
information was recorded:
+1. **Duration**: The total estimated time taken was approximately **36
minutes**.
+2. **CPU Consumption**: The estimated CPU usage on the new node was about
**1.4 CPU cores**.
+
+The detailed CPU usage rate is shown in the figure below.
+
+
+
+## Case 2: Half-Data Property Repair
+
+In the second test case, three nodes are started, with the group’s number of
copies initially set to two.
+First, **50,000** records are written to all three nodes.
+Next, the group’s copies' setting is changed to one, and the remaining
**50,000** records are written to only two fixed nodes.
+At this point, the third node’s dataset is missing half of the data compared
to the other nodes.
+Finally, the group’s copies' setting is changed back to two, allowing the
backend Property Repair process to perform the data synchronization
automatically.
+
+### Running the Integrated Test
+
+This test case runs as an integrated test that handles all steps automatically:
+
+```bash
+cd test/property_repair/half_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 2 replicas and loads 50,000 properties
+3. Reduces the group to 1 replica
+4. Writes additional 50,000 properties (creating data inconsistency)
+5. Increases the group back to 2 replicas to trigger property repair
+6. Monitors the repair process through Prometheus metrics
+7. Verifies both propagation count and repair success count increase
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following
information was recorded:
+1. **Duration**: The total estimated time taken was approximately **30
minutes**.
+2. **CPU Consumption**: The estimated CPU usage on the new node was about
**1.1 CPU cores**.
+
+The detailed CPU usage rate is shown in the figure below.
+
+
+
+## Case 3: All Nodes Data are the Same
+
+In the third test case, which represents the most common scenario, all nodes
contain identical data.
+
+### Running the Integrated Test
+
+This test case runs as an integrated test that handles all steps automatically:
+
+```bash
+cd test/property_repair/same_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 2 replicas and loads 100,000 properties across all
nodes
+3. Monitors the repair process through Prometheus metrics
+4. Verifies propagation count increases (repair count verification is skipped
since data is consistent)
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following
information was recorded:
+1. **Duration**: Almost Less than **1 minute**.
+2. **CPU Consumption**: The estimated CPU usage in almost has no impact.
\ No newline at end of file
diff --git a/test/property_repair/base-compose.yml
b/test/property_repair/base-compose.yml
new file mode 100644
index 00000000..112a48c5
--- /dev/null
+++ b/test/property_repair/base-compose.yml
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+
+x-data-base: &data_base
+ extends:
+ file: ../docker/base-compose.yml
+ service: data
+ image: apache/skywalking-banyandb:latest
+ depends_on:
+ etcd:
+ condition: service_healthy
+ cpus: 2
+ mem_limit: 4g
+ command: &base_data_cmd |
+ data --etcd-endpoints=http://etcd:2379
+ --observability-modes=prometheus
+ --observability-listener-addr=:2121
+ --property-repair-enabled=true
+ --property-repair-build-tree-cron="@every 30s"
+ --stream-root-path=/tmp/banyandb-data
+ --measure-root-path=/tmp/banyandb-data
+ --property-root-path=/tmp/banyandb-data
+
+# Base services for property repair tests
+services:
+ etcd:
+ extends:
+ file: ../docker/base-compose.yml
+ service: etcd
+ ports:
+ - "2379:2379"
+
+ liaison:
+ extends:
+ file: ../docker/base-compose.yml
+ service: liaison
+ image: apache/skywalking-banyandb:latest
+ depends_on:
+ etcd:
+ condition: service_healthy
+ cpus: 2
+ mem_limit: 3g
+ container_name: liaison
+ ports:
+ - "17912:17912"
+ - "17913:17913"
+ - "2121:2121" # observability port for metrics
+ command: liaison --etcd-endpoints=http://etcd:2379
--observability-modes=prometheus
+
+ # Data nodes with specific configurations
+ data-node-1:
+ <<: *data_base
+ hostname: data-node-1
+ container_name: data-node-1
+ ports:
+ - "2122:2121" # observability port
+ - "6061:6060" # pprof port
+ command: |
+ data --etcd-endpoints=http://etcd:2379
+ --observability-modes=prometheus
+ --observability-listener-addr=:2121
+ --property-repair-enabled=true
+ --property-repair-build-tree-cron="@every 30s"
+ --stream-root-path=/tmp/banyandb-data
+ --measure-root-path=/tmp/banyandb-data
+ --property-root-path=/tmp/banyandb-data
+ --property-repair-trigger-cron="*/10 * * * *"
+
+ data-node-2:
+ <<: *data_base
+ hostname: data-node-2
+ container_name: data-node-2
+ ports:
+ - "2123:2121" # observability port
+ - "6062:6060" # pprof port
+ command: *base_data_cmd
+
+ data-node-3:
+ <<: *data_base
+ hostname: data-node-3
+ container_name: data-node-3
+ ports:
+ - "2124:2121" # observability port
+ - "6063:6060" # pprof port
+ command: *base_data_cmd
+
+ # Prometheus for monitoring
+ prometheus:
+ image: prom/prometheus:latest
+ ports:
+ - "9090:9090"
+ command:
+ - '--config.file=/etc/prometheus/prometheus.yml'
+ - '--storage.tsdb.path=/prometheus'
+ - '--web.console.libraries=/etc/prometheus/console_libraries'
+ - '--web.console.templates=/etc/prometheus/consoles'
\ No newline at end of file
diff --git a/test/property_repair/full_data/cpu-usage.png
b/test/property_repair/full_data/cpu-usage.png
new file mode 100644
index 00000000..54410cad
Binary files /dev/null and b/test/property_repair/full_data/cpu-usage.png differ
diff --git a/test/property_repair/full_data/docker-compose-3nodes.yml
b/test/property_repair/full_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..20663b00
--- /dev/null
+++ b/test/property_repair/full_data/docker-compose-3nodes.yml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+services:
+ etcd:
+ extends:
+ file: ../base-compose.yml
+ service: etcd
+ networks:
+ - property-repair
+
+ liaison:
+ extends:
+ file: ../base-compose.yml
+ service: liaison
+ networks:
+ - property-repair
+
+ data-node-1:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-1
+ networks:
+ - property-repair
+
+ data-node-2:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-2
+ networks:
+ - property-repair
+
+ data-node-3:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-3
+ networks:
+ - property-repair
+
+ prometheus:
+ extends:
+ file: ../base-compose.yml
+ service: prometheus
+ networks:
+ - property-repair
+ volumes:
+ - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+ property-repair:
+ driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/full_data/integrated_test.go
b/test/property_repair/full_data/integrated_test.go
new file mode 100644
index 00000000..1e4e3baa
--- /dev/null
+++ b/test/property_repair/full_data/integrated_test.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fulldata
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ propertyrepair
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+ composeFile string
+ conn *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ propertyClient databasev1.PropertyRegistryServiceClient
+ propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairIntegrated(t *testing.T) {
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Property Repair Integrated Test Suite",
ginkgo.Label("integration", "slow", "property_repair", "full_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+ fmt.Println("Starting Property Repair Integration Test Suite...")
+
+ // Disable Ryuk reaper to avoid container creation issues
+ os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+ // Set Docker host if needed (for local development)
+ if os.Getenv("DOCKER_HOST") == "" {
+ os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+ }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+ if conn != nil {
+ _ = conn.Close()
+ }
+ if composeFile != "" {
+ fmt.Println("Stopping compose stack...")
+ propertyrepair.ExecuteComposeCommand("-f", composeFile, "down")
+ }
+})
+
+var _ = ginkgo.Describe("Property Repair Full Data Test", ginkgo.Ordered,
func() {
+ ginkgo.Describe("Step 1: Initial Data Load with 2 Nodes", func() {
+ ginkgo.It("Should start 3 data node cluster", func() {
+ // Initialize compose stack with 2-node configuration
+ var err error
+ composeFile, err =
filepath.Abs("docker-compose-3nodes.yml")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ fmt.Printf("Using compose file: %s\n", composeFile)
+
+ // Start the docker compose stack without waiting first
+ fmt.Println("Starting services...")
+ err = propertyrepair.ExecuteComposeCommand("-f",
composeFile, "up", "-d")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ // Simple wait for services to be ready
+ time.Sleep(10 * time.Second)
+ })
+
+ ginkgo.It("Should connect to liaison and setup clients", func()
{
+ var err error
+ fmt.Println("Connecting to Liaison server...")
+
+ conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr,
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ fmt.Println("Connected to Liaison server successfully")
+
+ groupClient =
databasev1.NewGroupRegistryServiceClient(conn)
+ propertyClient =
databasev1.NewPropertyRegistryServiceClient(conn)
+ propertyServiceClient =
propertyv1.NewPropertyServiceClient(conn)
+ })
+
+ ginkgo.It("Should create group with 1 replica and write 100k
properties", func() {
+ ctx := context.Background()
+
+ fmt.Println("=== Step 1: Creating group with 1 replica
and loading initial data ===")
+
+ // Create group with 1 replica
+ propertyrepair.CreateGroup(ctx, groupClient, 1)
+
+ // Create property schema
+ propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+ // Write 100,000 properties
+ fmt.Println("Starting to write 100,000 properties...")
+ startTime := time.Now()
+
+ err := propertyrepair.WriteProperties(ctx,
propertyServiceClient, 0, 100000)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 1 completed: wrote 100,000
properties in %v ===\n", duration)
+ })
+ })
+
+ ginkgo.Describe("Step 2: Add 3rd Node and Update Replicas", func() {
+ ginkgo.It("Should update group replicas to 2", func() {
+ ctx := context.Background()
+
+ fmt.Println("Updating group replicas from 1 to 2...")
+ startTime := time.Now()
+
+ // Update group replicas to 2
+ propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2)
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 2 completed: updated replicas to 2
in %v ===\n", duration)
+ })
+ })
+
+ ginkgo.Describe("Verification", func() {
+ ginkgo.It("Should verify the property repair completed and
prometheus metrics", func() {
+ fmt.Println("=== Verification: Property repair process
and prometheus metrics ===")
+
+ // Get initial metrics from all data nodes
+ fmt.Println("Reading initial prometheus metrics from
all data nodes...")
+ beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+ // Print initial metrics state
+ fmt.Println("Initial metrics state:")
+ for _, metrics := range beforeMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be healthy
before verification: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ }
+
+ fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
+ fmt.Println("Waiting for property repair to trigger
(@every 10 minutes)...")
+
+ gomega.Eventually(func() bool {
+ time.Sleep(time.Second * 30)
+ // Get metrics after repair
+ fmt.Println("Trying to reading prometheus
metrics to check repair status...")
+ afterMetrics :=
propertyrepair.GetAllNodeMetrics()
+
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+ // Check all node health, no crash
+ for _, metrics := range afterMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be
healthy after repair: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ }
+
+ // check the property repair progress finished,
and the property must be repaired (at last one success)
+ isAnyRepairFinished := false
+ for i, before := range beforeMetrics {
+ after := afterMetrics[i]
+ if before.TotalPropagationCount <
after.TotalPropagationCount &&
+ before.RepairSuccessCount <
after.RepairSuccessCount {
+ isAnyRepairFinished = true
+ }
+ }
+ return isAnyRepairFinished
+ }, time.Hour*2).Should(gomega.BeTrue(), "Property
repair cycle should complete within the expected time")
+ })
+ })
+})
diff --git a/test/property_repair/half_data/cpu-usage.png
b/test/property_repair/half_data/cpu-usage.png
new file mode 100644
index 00000000..fd367531
Binary files /dev/null and b/test/property_repair/half_data/cpu-usage.png differ
diff --git a/test/property_repair/half_data/docker-compose-3nodes.yml
b/test/property_repair/half_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..ce2a2070
--- /dev/null
+++ b/test/property_repair/half_data/docker-compose-3nodes.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+volumes:
+ node1-data: {}
+ node2-data: {}
+ node3-data: {}
+
+services:
+ etcd:
+ extends:
+ file: ../base-compose.yml
+ service: etcd
+ networks:
+ - half-data
+
+ liaison:
+ extends:
+ file: ../base-compose.yml
+ service: liaison
+ networks:
+ - half-data
+
+ data-node-1:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-1
+ volumes:
+ - node1-data:/tmp/banyandb-data
+ networks:
+ - half-data
+
+ data-node-2:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-2
+ volumes:
+ - node2-data:/tmp/banyandb-data
+ networks:
+ - half-data
+
+ data-node-3:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-3
+ volumes:
+ - node3-data:/tmp/banyandb-data
+ networks:
+ - half-data
+
+ prometheus:
+ extends:
+ file: ../base-compose.yml
+ service: prometheus
+ networks:
+ - half-data
+ volumes:
+ - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+ half-data:
+ driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/half_data/integrated_test.go
b/test/property_repair/half_data/integrated_test.go
new file mode 100644
index 00000000..119d0b49
--- /dev/null
+++ b/test/property_repair/half_data/integrated_test.go
@@ -0,0 +1,219 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package halfdata
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ propertyrepair
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+ composeFile string
+ conn *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ propertyClient databasev1.PropertyRegistryServiceClient
+ propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairHalfData(t *testing.T) {
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Property Repair Half Data Test Suite",
ginkgo.Label("integration", "slow", "property_repair", "half_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+ fmt.Println("Starting Property Repair Half Data Integration Test
Suite...")
+
+ // Disable Ryuk reaper to avoid container creation issues
+ os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+ // Set Docker host if needed (for local development)
+ if os.Getenv("DOCKER_HOST") == "" {
+ os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+ }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+ if conn != nil {
+ _ = conn.Close()
+ }
+ if composeFile != "" {
+ fmt.Println("Stopping compose stack...")
+ _ = propertyrepair.ExecuteComposeCommand(composeFile, "down")
+ }
+})
+
+var _ = ginkgo.Describe("Property Repair Half Data Test", ginkgo.Ordered,
func() {
+ ginkgo.Describe("Step 1: Initial Setup and Data Load", func() {
+ ginkgo.It("Should start 3 data node cluster", func() {
+ // Initialize compose stack with 3-node configuration
+ var err error
+ composeFile, err =
filepath.Abs("docker-compose-3nodes.yml")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ fmt.Printf("Using compose file: %s\n", composeFile)
+
+ // Start the docker compose stack without waiting first
+ fmt.Println("Starting services...")
+ err = propertyrepair.ExecuteComposeCommand("-f",
composeFile, "up", "-d")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ // Simple wait for services to be ready
+ time.Sleep(10 * time.Second)
+ })
+
+ ginkgo.It("Should connect to liaison and setup clients", func()
{
+ var err error
+ fmt.Println("Connecting to Liaison server...")
+
+ conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr,
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ fmt.Println("Connected to Liaison server successfully")
+
+ groupClient =
databasev1.NewGroupRegistryServiceClient(conn)
+ propertyClient =
databasev1.NewPropertyRegistryServiceClient(conn)
+ propertyServiceClient =
propertyv1.NewPropertyServiceClient(conn)
+ })
+
+ ginkgo.It("Should create group with 2 replicas and write 50k
properties", func() {
+ ctx := context.Background()
+
+ fmt.Println("=== Step 1: Creating group with 2 replicas
and loading initial data ===")
+
+ // Create group with 2 replicas
+ propertyrepair.CreateGroup(ctx, groupClient, 2)
+
+ // Create property schema
+ propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+ // Write 50,000 properties
+ fmt.Println("Starting to write 50,000 properties...")
+ startTime := time.Now()
+
+ err := propertyrepair.WriteProperties(ctx,
propertyServiceClient, 0, 50000)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 1 completed: wrote 50,000
properties in %v ===\n", duration)
+ })
+ })
+
+ ginkgo.Describe("Step 2: Reduce Replicas and Add More Data", func() {
+ ginkgo.It("Should reduce group replicas to 1", func() {
+ ctx := context.Background()
+
+ fmt.Println("Reducing group replicas from 2 to 1...")
+ startTime := time.Now()
+
+ // Update group replicas to 1
+ propertyrepair.UpdateGroupReplicas(ctx, groupClient, 1)
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 2 completed: updated replicas to 1
in %v ===\n", duration)
+ })
+
+ ginkgo.It("Should write additional 50k properties", func() {
+ ctx := context.Background()
+
+ fmt.Println("=== Step 3: Writing additional properties
===")
+ startTime := time.Now()
+
+ // Write another 50,000 properties (50000-100000)
+ err := propertyrepair.WriteProperties(ctx,
propertyServiceClient, 50000, 100000)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 3 completed: wrote additional
50,000 properties in %v ===\n", duration)
+ })
+
+ ginkgo.It("Should increase group replicas back to 2", func() {
+ ctx := context.Background()
+
+ fmt.Println("Increasing group replicas from 1 to 2...")
+ startTime := time.Now()
+
+ // Update group replicas to 2
+ propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2)
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 4 completed: updated replicas to 2
in %v ===\n", duration)
+ })
+ })
+
+ ginkgo.Describe("Verification", func() {
+ ginkgo.It("Should verify the property repair completed and
prometheus metrics", func() {
+ fmt.Println("=== Verification: Property repair process
and prometheus metrics ===")
+
+ // Get initial metrics from all data nodes
+ fmt.Println("Reading initial prometheus metrics from
all data nodes...")
+ beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+ // Print initial metrics state
+ fmt.Println("Initial metrics state:")
+ for _, metrics := range beforeMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be healthy
before verification: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ }
+
+ fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
+ fmt.Println("Waiting for property repair to trigger
(@every 10 minutes)...")
+
+ gomega.Eventually(func() bool {
+ time.Sleep(time.Second * 30)
+ // Get metrics after repair
+ fmt.Println("Trying to reading prometheus
metrics to check repair status...")
+ afterMetrics :=
propertyrepair.GetAllNodeMetrics()
+
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+ // Check all node health, no crash
+ for _, metrics := range afterMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be
healthy after repair: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ }
+
+ // check the property repair progress finished,
and the property must be repaired (at last one success)
+ isAnyRepairFinished := false
+ for i, before := range beforeMetrics {
+ after := afterMetrics[i]
+ if before.TotalPropagationCount <
after.TotalPropagationCount &&
+ before.RepairSuccessCount <
after.RepairSuccessCount {
+ isAnyRepairFinished = true
+ }
+ }
+ return isAnyRepairFinished
+ }, time.Hour*2).Should(gomega.BeTrue(), "Property
repair cycle should complete within the expected time")
+ })
+ })
+})
diff --git a/test/property_repair/prometheus-3nodes.yml
b/test/property_repair/prometheus-3nodes.yml
new file mode 100644
index 00000000..23968dfc
--- /dev/null
+++ b/test/property_repair/prometheus-3nodes.yml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+global:
+ scrape_interval: 15s
+ evaluation_interval: 15s
+
+scrape_configs:
+ - job_name: 'liaison'
+ static_configs:
+ - targets: ['liaison:2121']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ - job_name: 'data-node-1'
+ static_configs:
+ - targets: ['data-node-1:2121']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ - job_name: 'data-node-2'
+ static_configs:
+ - targets: ['data-node-2:2121']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
+
+ - job_name: 'data-node-3'
+ static_configs:
+ - targets: ['data-node-3:2121']
+ scrape_interval: 5s
+ metrics_path: '/metrics'
\ No newline at end of file
diff --git a/test/property_repair/same_data/docker-compose-3nodes.yml
b/test/property_repair/same_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..e0ef9802
--- /dev/null
+++ b/test/property_repair/same_data/docker-compose-3nodes.yml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+services:
+ etcd:
+ extends:
+ file: ../base-compose.yml
+ service: etcd
+ networks:
+ - no-data
+
+ liaison:
+ extends:
+ file: ../base-compose.yml
+ service: liaison
+ networks:
+ - no-data
+
+ data-node-1:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-1
+ networks:
+ - no-data
+
+ data-node-2:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-2
+ networks:
+ - no-data
+
+ data-node-3:
+ extends:
+ file: ../base-compose.yml
+ service: data-node-3
+ networks:
+ - no-data
+
+ prometheus:
+ extends:
+ file: ../base-compose.yml
+ service: prometheus
+ networks:
+ - no-data
+ volumes:
+ - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+ no-data:
+ driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/same_data/integrated_test.go
b/test/property_repair/same_data/integrated_test.go
new file mode 100644
index 00000000..e4674c8c
--- /dev/null
+++ b/test/property_repair/same_data/integrated_test.go
@@ -0,0 +1,178 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package samedata
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ propertyrepair
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+ composeFile string
+ conn *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ propertyClient databasev1.PropertyRegistryServiceClient
+ propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairSameData(t *testing.T) {
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "Property Repair Same Data Test Suite",
ginkgo.Label("integration", "slow", "property_repair", "same_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+ fmt.Println("Starting Property Repair Same Data Integration Test
Suite...")
+
+ // Disable Ryuk reaper to avoid container creation issues
+ os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+ // Set Docker host if needed (for local development)
+ if os.Getenv("DOCKER_HOST") == "" {
+ os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+ }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+ if conn != nil {
+ _ = conn.Close()
+ }
+ if composeFile != "" {
+ fmt.Println("Stopping compose stack...")
+ propertyrepair.ExecuteComposeCommand(composeFile, "down")
+ }
+})
+
+var _ = ginkgo.Describe("Property Repair Same Data Test", ginkgo.Ordered,
func() {
+ ginkgo.Describe("Step 1: Initial Data Load", func() {
+ ginkgo.It("Should start 3 data node cluster", func() {
+ // Initialize compose stack with 3-node configuration
+ var err error
+ composeFile, err =
filepath.Abs("docker-compose-3nodes.yml")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ fmt.Printf("Using compose file: %s\n", composeFile)
+
+ // Start the docker compose stack without waiting first
+ fmt.Println("Starting services...")
+ err = propertyrepair.ExecuteComposeCommand("-f",
composeFile, "up", "-d")
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ // Simple wait for services to be ready
+ time.Sleep(10 * time.Second)
+ })
+
+ ginkgo.It("Should connect to liaison and setup clients", func()
{
+ var err error
+ fmt.Println("Connecting to Liaison server...")
+
+ conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr,
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ fmt.Println("Connected to Liaison server successfully")
+
+ groupClient =
databasev1.NewGroupRegistryServiceClient(conn)
+ propertyClient =
databasev1.NewPropertyRegistryServiceClient(conn)
+ propertyServiceClient =
propertyv1.NewPropertyServiceClient(conn)
+ })
+
+ ginkgo.It("Should create group with 2 replicas and write 100k
properties", func() {
+ ctx := context.Background()
+
+ fmt.Println("=== Step 1: Creating group with 2 replicas
and loading data ===")
+
+ // Create group with 2 replicas
+ propertyrepair.CreateGroup(ctx, groupClient, 2)
+
+ // Create property schema
+ propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+ // Write 100,000 properties (same amount across all
replicas)
+ fmt.Println("Starting to write 100,000 properties...")
+ startTime := time.Now()
+
+ err := propertyrepair.WriteProperties(ctx,
propertyServiceClient, 0, 100000)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ duration := time.Since(startTime)
+ fmt.Printf("=== Step 1 completed: wrote 100,000
properties in %v ===\n", duration)
+ })
+ })
+
+ ginkgo.Describe("Verification", func() {
+ ginkgo.It("Should verify the property repair completed and
prometheus metrics", func() {
+ fmt.Println("=== Verification: Property repair process
and prometheus metrics ===")
+
+ // Get initial metrics from all data nodes
+ fmt.Println("Reading initial prometheus metrics from
all data nodes...")
+ beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+ // Print initial metrics state
+ fmt.Println("Initial metrics state:")
+ for _, metrics := range beforeMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be healthy
before verification: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ fmt.Printf("- %s: total_propagation_count=%d,
repair_success_count=%d\n",
+ metrics.NodeName,
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+ }
+
+ fmt.Println("\n=== Triggering property repair by
waiting for scheduled repair cycle ===")
+ fmt.Println("Waiting for property repair to trigger
(@every 10 minutes)...")
+
+ gomega.Eventually(func() bool {
+ time.Sleep(time.Second * 30)
+ // Get metrics after repair
+ fmt.Println("Trying to reading prometheus
metrics to check repair status...")
+ afterMetrics :=
propertyrepair.GetAllNodeMetrics()
+
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+ // Check all node health, no crash
+ for _, metrics := range afterMetrics {
+
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+ fmt.Sprintf("Node %s should be
healthy after repair: %s",
+ metrics.NodeName,
metrics.ErrorMessage))
+ }
+
+ // For same data scenario, only verify
propagation count increased (not repair count)
+ // Since data is consistent, repairs may not be
needed but propagation should still occur
+ isPropagationActive := false
+ for i, before := range beforeMetrics {
+ after := afterMetrics[i]
+ if before.TotalPropagationCount <
after.TotalPropagationCount {
+ isPropagationActive = true
+ break
+ }
+ }
+ return isPropagationActive
+ }, time.Hour*2).Should(gomega.BeTrue(), "Property
propagation should be active even with consistent data")
+ })
+ })
+})
diff --git a/test/property_repair/shared_utils.go
b/test/property_repair/shared_utils.go
new file mode 100644
index 00000000..12c9cebd
--- /dev/null
+++ b/test/property_repair/shared_utils.go
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyrepair package provides utilities for property repair
performance testing in BanyanDB.
+package propertyrepair
+
+import (
+ "context"
+ "crypto/rand"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "os/exec"
+ "regexp"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+)
+
+// Constants for property repair performance testing.
+const (
+ DataSize = 2048 // 2KB per property
+ LiaisonAddr = "localhost:17912"
+ Concurrency = 6
+ GroupName = "perf-test-group"
+ PropertyName = "perf-test-property"
+)
+
+// PrometheusEndpoints defines the prometheus endpoints for data nodes.
+var PrometheusEndpoints = []string{
+ "http://localhost:2122/metrics", // data-node-1
+ "http://localhost:2123/metrics", // data-node-2
+ "http://localhost:2124/metrics", // data-node-3
+}
+
+// NodeMetrics represents the metrics for a data node.
+type NodeMetrics struct {
+ LastScrapeTime time.Time
+ NodeName string
+ ErrorMessage string
+ TotalPropagationCount int64
+ RepairSuccessCount int64
+ IsHealthy bool
+}
+
+// GenerateLargeData creates a string of specified size filled with random
characters.
+func GenerateLargeData(size int) string {
+ const charset =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+
+ // Generate some random bytes
+ randomBytes := make([]byte, 32)
+ _, err := rand.Read(randomBytes)
+ if err != nil {
+ // Fallback to timestamp-based data
+ baseData := fmt.Sprintf("timestamp-%d-", time.Now().UnixNano())
+ repeats := size / len(baseData)
+ if repeats == 0 {
+ repeats = 1
+ }
+ return strings.Repeat(baseData, repeats)[:size]
+ }
+
+ // Create base string from random bytes
+ var baseBuilder strings.Builder
+ for _, b := range randomBytes {
+ baseBuilder.WriteByte(charset[b%byte(len(charset))])
+ }
+ baseData := baseBuilder.String()
+
+ // Repeat to reach desired size
+ repeats := (size / len(baseData)) + 1
+ result := strings.Repeat(baseData, repeats)
+
+ if len(result) > size {
+ return result[:size]
+ }
+ return result
+}
+
+// FormatDuration formats a duration to a human-readable string.
+func FormatDuration(duration time.Duration) string {
+ if duration < time.Second {
+ return fmt.Sprintf("%dms", duration.Milliseconds())
+ }
+ if duration < time.Minute {
+ return fmt.Sprintf("%.1fs", duration.Seconds())
+ }
+ return fmt.Sprintf("%.1fm", duration.Minutes())
+}
+
+// FormatThroughput calculates and formats throughput.
+func FormatThroughput(count int64, duration time.Duration) string {
+ if duration == 0 {
+ return "N/A"
+ }
+ throughput := float64(count) / duration.Seconds()
+ return fmt.Sprintf("%.1f/s", throughput)
+}
+
+// CreateGroup creates a property group with specified parameters.
+func CreateGroup(ctx context.Context, groupClient
databasev1.GroupRegistryServiceClient, replicaNum uint32) {
+ fmt.Printf("Creating group %s with %d replicas...\n", GroupName,
replicaNum)
+ _, err := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: GroupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ Replicas: replicaNum,
+ },
+ },
+ })
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// UpdateGroupReplicas updates the replica number of an existing group.
+func UpdateGroupReplicas(ctx context.Context, groupClient
databasev1.GroupRegistryServiceClient, newReplicaNum uint32) {
+ fmt.Printf("Updating group %s to %d replicas...\n", GroupName,
newReplicaNum)
+ _, err := groupClient.Update(ctx,
&databasev1.GroupRegistryServiceUpdateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: GroupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ Replicas: newReplicaNum,
+ },
+ },
+ })
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// CreatePropertySchema creates a property schema.
+func CreatePropertySchema(ctx context.Context, propertyClient
databasev1.PropertyRegistryServiceClient) {
+ fmt.Printf("Creating property schema %s...\n", PropertyName)
+ _, err := propertyClient.Create(ctx,
&databasev1.PropertyRegistryServiceCreateRequest{
+ Property: &databasev1.Property{
+ Metadata: &commonv1.Metadata{
+ Name: PropertyName,
+ Group: GroupName,
+ },
+ Tags: []*databasev1.TagSpec{
+ {Name: "data", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "timestamp", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ },
+ },
+ })
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// WriteProperties writes a batch of properties concurrently.
+func WriteProperties(ctx context.Context, propertyServiceClient
propertyv1.PropertyServiceClient,
+ startIdx int, endIdx int,
+) error {
+ fmt.Printf("Starting to write %d-%d properties using %d
goroutines...\n",
+ startIdx, endIdx, Concurrency)
+
+ startTime := time.Now()
+
+ // Channel to generate property data
+ dataChannel := make(chan int, 1000) // Buffer for property indices
+ var wg sync.WaitGroup
+ var totalProcessed int64
+
+ // Start data producer goroutine
+ go func() {
+ defer close(dataChannel)
+ for i := startIdx; i < endIdx; i++ {
+ dataChannel <- i
+ }
+ }()
+
+ // Start consumer goroutines
+ for i := 0; i < Concurrency; i++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer ginkgo.GinkgoRecover()
+ defer wg.Done()
+ var count int64
+
+ for propertyIndex := range dataChannel {
+ propertyID := fmt.Sprintf("property-%d",
propertyIndex)
+ largeData := GenerateLargeData(DataSize)
+ timestamp := time.Now().Format(time.RFC3339Nano)
+
+ _, writeErr := propertyServiceClient.Apply(ctx,
&propertyv1.ApplyRequest{
+ Property: &propertyv1.Property{
+ Metadata: &commonv1.Metadata{
+ Name: PropertyName,
+ Group: GroupName,
+ },
+ Id: propertyID,
+ Tags: []*modelv1.Tag{
+ {Key: "data", Value:
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
largeData}}}},
+ {Key: "timestamp",
Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
timestamp}}}},
+ },
+ },
+ })
+
gomega.Expect(writeErr).NotTo(gomega.HaveOccurred())
+
+ count++
+ atomic.AddInt64(&totalProcessed, 1)
+
+ if atomic.LoadInt64(&totalProcessed)%500 == 0 {
+ elapsed := time.Since(startTime)
+ totalCount :=
atomic.LoadInt64(&totalProcessed)
+ fmt.Printf("total processed: %d, use:
%v\n", totalCount, elapsed)
+ }
+ }
+
+ fmt.Printf("Worker %d completed: processed %d
properties total\n", workerID, count)
+ }(i)
+ }
+
+ wg.Wait()
+ endTime := time.Now()
+ duration := endTime.Sub(startTime)
+ fmt.Printf("Write completed: %d properties in %s (%s props/sec)\n",
+ endIdx, FormatDuration(duration),
FormatThroughput(int64(endIdx), duration))
+ return nil
+}
+
+// GetNodeMetrics fetches prometheus metrics from a single data node endpoint.
+func GetNodeMetrics(endpoint string, nodeIndex int) *NodeMetrics {
+ nodeName := fmt.Sprintf("data-node-%d", nodeIndex+1)
+ metrics := &NodeMetrics{
+ NodeName: nodeName,
+ LastScrapeTime: time.Now(),
+ IsHealthy: false,
+ }
+
+ // Set timeout for HTTP request
+ client := &http.Client{
+ Timeout: 10 * time.Second,
+ }
+
+ resp, err := client.Get(endpoint)
+ if err != nil {
+ metrics.ErrorMessage = fmt.Sprintf("Failed to connect to %s:
%v", endpoint, err)
+ return metrics
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ metrics.ErrorMessage = fmt.Sprintf("HTTP error %d from %s",
resp.StatusCode, endpoint)
+ return metrics
+ }
+
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ metrics.ErrorMessage = fmt.Sprintf("Failed to read response
from %s: %v", endpoint, err)
+ return metrics
+ }
+
+ // Parse metrics from prometheus data
+ content := string(body)
+ totalPropagationCount := parseTotalPropagationCount(content)
+ repairSuccessCount := parseRepairSuccessCount(content)
+
+ metrics.TotalPropagationCount = totalPropagationCount
+ metrics.RepairSuccessCount = repairSuccessCount
+ metrics.IsHealthy = true
+ return metrics
+}
+
+// parseTotalPropagationCount parses the total_propagation_count from
prometheus metrics text.
+func parseTotalPropagationCount(content string) int64 {
+ // Look for metric lines like:
banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"}
3
+ re :=
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// parseRepairSuccessCount parses the repair_success_count from prometheus
metrics text.
+func parseRepairSuccessCount(content string) int64 {
+ // Look for metric lines like:
banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"}
100
+ re :=
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+ matches := re.FindAllStringSubmatch(content, -1)
+
+ var totalCount int64
+ for _, match := range matches {
+ if len(match) >= 2 {
+ value, err := strconv.ParseFloat(match[1], 64)
+ if err != nil {
+ continue
+ }
+ totalCount += int64(value)
+ }
+ }
+
+ return totalCount
+}
+
+// GetAllNodeMetrics fetches metrics from all data nodes concurrently.
+func GetAllNodeMetrics() []*NodeMetrics {
+ var wg sync.WaitGroup
+ metrics := make([]*NodeMetrics, len(PrometheusEndpoints))
+
+ for i, endpoint := range PrometheusEndpoints {
+ wg.Add(1)
+ go func(index int, url string) {
+ defer wg.Done()
+ metrics[index] = GetNodeMetrics(url, index)
+ }(i, endpoint)
+ }
+
+ wg.Wait()
+ return metrics
+}
+
+// VerifyPropagationCountIncreased compares metrics before and after to verify
total_propagation_count increased by exactly 1.
+func VerifyPropagationCountIncreased(beforeMetrics, afterMetrics
[]*NodeMetrics) error {
+ if len(beforeMetrics) != len(afterMetrics) {
+ return fmt.Errorf("metrics array length mismatch: before=%d,
after=%d", len(beforeMetrics), len(afterMetrics))
+ }
+
+ for i, after := range afterMetrics {
+ before := beforeMetrics[i]
+
+ if !after.IsHealthy {
+ return fmt.Errorf("node %s is not healthy after update:
%s", after.NodeName, after.ErrorMessage)
+ }
+
+ if !before.IsHealthy {
+ return fmt.Errorf("node %s was not healthy before
update: %s", before.NodeName, before.ErrorMessage)
+ }
+
+ expectedCount := before.TotalPropagationCount + 1
+ if after.TotalPropagationCount != expectedCount {
+ return fmt.Errorf("node %s propagation count mismatch:
expected=%d, actual=%d (before=%d)",
+ after.NodeName, expectedCount,
after.TotalPropagationCount, before.TotalPropagationCount)
+ }
+ }
+
+ return nil
+}
+
+// PrintMetricsComparison prints a comparison of metrics before and after.
+func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) {
+ fmt.Println("=== Prometheus Metrics Comparison ===")
+ fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation
Count", "Repair Success Count", "Healthy")
+ fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "",
"Before", "After", "Delta", "Before", "After", "Delta", "")
+ fmt.Println(strings.Repeat("-", 85))
+
+ for i, after := range afterMetrics {
+ if i < len(beforeMetrics) {
+ before := beforeMetrics[i]
+ propagationDelta := after.TotalPropagationCount -
before.TotalPropagationCount
+ repairDelta := after.RepairSuccessCount -
before.RepairSuccessCount
+ healthStatus := "✓"
+ if !after.IsHealthy {
+ healthStatus = "✗"
+ }
+
+ fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d |
%-7s\n",
+ after.NodeName,
+ before.TotalPropagationCount,
after.TotalPropagationCount, propagationDelta,
+ before.RepairSuccessCount,
after.RepairSuccessCount, repairDelta,
+ healthStatus)
+ }
+ }
+ fmt.Println()
+}
+
+// ExecuteComposeCommand executes a docker-compose command, supporting both v1
and v2.
+func ExecuteComposeCommand(args ...string) error {
+ // v2
+ if _, err := exec.LookPath("docker"); err == nil {
+ check := exec.Command("docker", "compose", "version")
+ if out, err := check.CombinedOutput(); err == nil &&
strings.Contains(string(out), "Docker Compose") {
+ composeArgs := append([]string{"compose"}, args...)
+ cmd := exec.Command("docker", composeArgs...)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ return cmd.Run()
+ }
+ }
+
+ // v1
+ if _, err := exec.LookPath("docker-compose"); err == nil {
+ cmd := exec.Command("docker-compose", args...)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ return cmd.Run()
+ }
+
+ return nil
+}