This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 38f19094 Implement Handoff Queue for Trace (#832)
38f19094 is described below
commit 38f19094145a30c6403fbc4390d0b0cf57494725
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Nov 3 20:53:57 2025 +0800
Implement Handoff Queue for Trace (#832)
---
CHANGES.md | 1 +
banyand/internal/sidx/interfaces.go | 3 +
banyand/internal/sidx/sidx.go | 51 +
banyand/internal/sidx/sidx_test.go | 43 +
banyand/queue/local.go | 4 +
banyand/queue/pub/pub.go | 12 +
banyand/queue/queue.go | 1 +
banyand/trace/handoff_controller.go | 1261 ++++++++++++++++++++++++
banyand/trace/handoff_replay_test.go | 406 ++++++++
banyand/trace/handoff_storage.go | 373 +++++++
banyand/trace/handoff_storage_test.go | 692 +++++++++++++
banyand/trace/metadata.go | 9 +-
banyand/trace/streaming_pipeline_test.go | 7 +-
banyand/trace/svc_liaison.go | 116 ++-
banyand/trace/syncer.go | 4 +
banyand/trace/tstable.go | 57 ++
banyand/trace/wqueue.go | 2 +
banyand/trace/wqueue_test.go | 1 +
pkg/fs/file_system.go | 2 +
pkg/fs/local_file_system.go | 8 +
test/integration/handoff/handoff_suite_test.go | 584 +++++++++++
21 files changed, 3622 insertions(+), 15 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 73fb4557..0b0e8b89 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -53,6 +53,7 @@ Release Notes.
- Implement Trace Tree for debug mode.
- Implement bydbQL.
- UI: Implement the Query Page for BydbQL.
+- Implement the handoff queue for Trace.
### Bug Fixes
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index 5d80d6e2..f1570e53 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -58,6 +58,9 @@ type SIDX interface {
Merge(closeCh <-chan struct{}, partIDstoMerge map[uint64]struct{},
newPartID uint64) (*MergerIntroduction, error)
// StreamingParts returns the streaming parts.
StreamingParts(partIDsToSync map[uint64]struct{}, group string, shardID
uint32, name string) ([]queue.StreamingPartData, []func())
+ // PartPaths returns filesystem paths for the requested partIDs keyed
by partID.
+ // Missing partIDs are omitted from the returned map.
+ PartPaths(partIDs map[uint64]struct{}) map[uint64]string
// IntroduceSynced introduces a synced map to the SIDX instance.
IntroduceSynced(partIDsToSync map[uint64]struct{}) func()
}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index e48a7f32..f2cc93e9 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -159,6 +159,57 @@ func (s *sidx) Flush(partIDsToFlush map[uint64]struct{})
(*FlusherIntroduction,
return flushIntro, nil
}
+// PartPaths implements SIDX interface and returns on-disk paths for the
requested part IDs.
+func (s *sidx) PartPaths(partIDs map[uint64]struct{}) map[uint64]string {
+ if len(partIDs) == 0 {
+ return map[uint64]string{}
+ }
+
+ snap := s.currentSnapshot()
+ if snap == nil {
+ return map[uint64]string{}
+ }
+ defer snap.decRef()
+
+ result := make(map[uint64]string, len(partIDs))
+
+ for _, pw := range snap.parts {
+ var (
+ id uint64
+ path string
+ )
+
+ switch {
+ case pw.p != nil && pw.p.partMetadata != nil:
+ id = pw.p.partMetadata.ID
+ path = pw.p.path
+ case pw.mp != nil && pw.mp.partMetadata != nil:
+ id = pw.mp.partMetadata.ID
+ default:
+ continue
+ }
+
+ if _, ok := partIDs[id]; !ok {
+ continue
+ }
+ // Skip mem parts since they do not have stable on-disk paths
yet.
+ if pw.isMemPart() || pw.p == nil {
+ continue
+ }
+ if path == "" {
+ path = partPath(s.root, id)
+ }
+ result[id] = path
+
+ // All requested IDs found.
+ if len(result) == len(partIDs) {
+ break
+ }
+ }
+
+ return result
+}
+
// Close implements SIDX interface.
func (s *sidx) Close() error {
// Close current snapshot
diff --git a/banyand/internal/sidx/sidx_test.go
b/banyand/internal/sidx/sidx_test.go
index aad90147..ebd9a0c8 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -214,6 +214,49 @@ func TestSIDX_Write_WithTags(t *testing.T) {
writeTestData(t, sidx, reqs, 3, 3) // Test with segmentID=3, partID=3
}
+func TestSIDX_PartPaths(t *testing.T) {
+ sidxIface := createTestSIDX(t)
+ raw := sidxIface.(*sidx)
+ defer func() {
+ assert.NoError(t, raw.Close())
+ }()
+
+ const (
+ flushedID = uint64(101)
+ memOnlyID = uint64(202)
+ missingID = uint64(303)
+ )
+
+ req := []WriteRequest{
+ createTestWriteRequest(1, 100, "flushed-data"),
+ }
+
+ // Introduce a part that will be flushed to disk and another that stays
in memory.
+ writeTestData(t, raw, req, 1, flushedID)
+ writeTestData(t, raw, req, 2, memOnlyID)
+
+ flushIntro, err := raw.Flush(map[uint64]struct{}{flushedID: {}})
+ require.NoError(t, err)
+ require.NotNil(t, flushIntro)
+ raw.IntroduceFlushed(flushIntro)
+ flushIntro.Release()
+
+ // Empty request should return an empty map.
+ require.Empty(t, raw.PartPaths(map[uint64]struct{}{}))
+
+ paths := raw.PartPaths(map[uint64]struct{}{
+ flushedID: {},
+ memOnlyID: {},
+ missingID: {},
+ })
+
+ require.Len(t, paths, 1)
+ expectedPath := partPath(raw.root, flushedID)
+ assert.Equal(t, expectedPath, paths[flushedID])
+ assert.NotContains(t, paths, memOnlyID)
+ assert.NotContains(t, paths, missingID)
+}
+
// End-to-End Integration Tests.
func TestSIDX_WriteQueryIntegration(t *testing.T) {
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 1bb44a0c..f7fa41de 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -99,6 +99,10 @@ func (*local) GetPort() *uint32 {
func (*local) Register(bus.Topic, schema.EventHandler) {
}
+func (*local) HealthyNodes() []string {
+ return nil
+}
+
type localBatchPublisher struct {
ctx context.Context
local *bus.Bus
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e8a397ed..fd69d37a 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -480,3 +480,15 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string,
config *ChunkedSyncCli
config: config,
}, nil
}
+
+// HealthyNodes returns a list of node names that are currently healthy and
connected.
+func (p *pub) HealthyNodes() []string {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ nodes := make([]string, 0, len(p.active))
+ for name := range p.active {
+ nodes = append(nodes, name)
+ }
+ return nodes
+}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 14cab1ee..7193f9a9 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -50,6 +50,7 @@ type Client interface {
Register(bus.Topic, schema.EventHandler)
OnAddOrUpdate(md schema.Metadata)
GracefulStop()
+ HealthyNodes() []string
}
// Server is the interface for receiving data from the queue.
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
new file mode 100644
index 00000000..4ba09c2e
--- /dev/null
+++ b/banyand/trace/handoff_controller.go
@@ -0,0 +1,1261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// handoffController manages handoff queues for multiple data nodes.
+type handoffController struct {
+ fileSystem fs.FileSystem
+ tire2Client queueClient
+ resolveShardAssignments func(group string, shardID uint32) ([]string,
error)
+ inFlightSends map[string]map[uint64]struct{}
+ l *logger.Logger
+ replayTriggerChan chan string
+ nodeQueues map[string]*handoffNodeQueue
+ replayStopChan chan struct{}
+ healthyNodes map[string]struct{}
+ statusChangeChan chan nodeStatusChange
+ stopMonitor chan struct{}
+ root string
+ allDataNodes []string
+ monitorWg sync.WaitGroup
+ replayWg sync.WaitGroup
+ checkInterval time.Duration
+ replayBatchSize int
+ replayPollInterval time.Duration
+ maxTotalSizeBytes uint64
+ currentTotalSize uint64
+ mu sync.RWMutex
+ inFlightMu sync.RWMutex
+ sizeMu sync.RWMutex
+}
+
+// nodeStatusChange represents a node status transition.
+type nodeStatusChange struct {
+ nodeName string
+ isOnline bool
+}
+
+// queueClient interface combines health checking and sync client creation.
+type queueClient interface {
+ HealthyNodes() []string
+ NewChunkedSyncClient(node string, chunkSize uint32)
(queue.ChunkedSyncClient, error)
+}
+
+// newHandoffController creates a new handoff controller.
+func newHandoffController(fileSystem fs.FileSystem, root string, tire2Client
queueClient,
+ dataNodeList []string, maxSize int, l *logger.Logger,
+ resolveShardAssignments func(group string, shardID uint32) ([]string,
error),
+) (*handoffController, error) {
+ if fileSystem == nil {
+ return nil, fmt.Errorf("fileSystem is nil")
+ }
+ if l == nil {
+ return nil, fmt.Errorf("logger is nil")
+ }
+ if root == "" {
+ return nil, fmt.Errorf("root path is empty")
+ }
+
+ handoffRoot := filepath.Join(root, "handoff", "nodes")
+
+ hc := &handoffController{
+ l: l,
+ fileSystem: fileSystem,
+ nodeQueues: make(map[string]*handoffNodeQueue),
+ root: handoffRoot,
+ tire2Client: tire2Client,
+ allDataNodes: dataNodeList,
+ healthyNodes: make(map[string]struct{}),
+ statusChangeChan: make(chan nodeStatusChange, 100),
+ stopMonitor: make(chan struct{}),
+ checkInterval: 5 * time.Second,
+ replayStopChan: make(chan struct{}),
+ replayTriggerChan: make(chan string, 100),
+ inFlightSends: make(map[string]map[uint64]struct{}),
+ replayBatchSize: 10,
+ replayPollInterval: 1 * time.Second,
+ maxTotalSizeBytes: uint64(maxSize),
+ currentTotalSize: 0,
+ resolveShardAssignments: resolveShardAssignments,
+ }
+
+ // Create handoff root directory if it doesn't exist
+ fileSystem.MkdirIfNotExist(handoffRoot, storage.DirPerm)
+
+ // Load existing node queues from disk
+ if err := hc.loadExistingQueues(); err != nil {
+ l.Warn().Err(err).Msg("failed to load existing handoff queues")
+ }
+
+ // Start the node status monitor
+ hc.startMonitor()
+
+ // Start the replay worker
+ hc.startReplayWorker()
+
+ return hc, nil
+}
+
+// loadExistingQueues scans the handoff directory and loads existing node
queues.
+func (hc *handoffController) loadExistingQueues() error {
+ if hc == nil || hc.fileSystem == nil {
+ return fmt.Errorf("handoff controller is not initialized")
+ }
+
+ entries := hc.fileSystem.ReadDir(hc.root)
+ var totalRecoveredSize uint64
+ var errs []error
+
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+
+ nodeRoot := filepath.Join(hc.root, entry.Name())
+
+ // Read the original node address from .node_info file
+ nodeAddr, err := readNodeInfo(hc.fileSystem, nodeRoot)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("dir",
entry.Name()).Msg("failed to read node info, skipping")
+ errs = append(errs, fmt.Errorf("read node info for %s:
%w", entry.Name(), err))
+ continue
+ }
+
+ nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot,
hc.fileSystem, hc.l)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed
to load node queue")
+ errs = append(errs, fmt.Errorf("load node queue for %s:
%w", nodeAddr, err))
+ continue
+ }
+
+ hc.nodeQueues[nodeAddr] = nodeQueue
+
+ // Calculate queue size from metadata (not from actual file
sizes)
+ pending, err := nodeQueue.listPending()
+ if err != nil {
+ hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed
to list pending parts")
+ errs = append(errs, fmt.Errorf("list pending for %s:
%w", nodeAddr, err))
+ continue
+ }
+ var queueSize uint64
+ for _, ptp := range pending {
+ meta, err := nodeQueue.getMetadata(ptp.PartID,
ptp.PartType)
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to read part metadata")
+ errs = append(errs, fmt.Errorf("metadata for
node %s part %x (%s): %w",
+ nodeAddr, ptp.PartID, ptp.PartType,
err))
+ continue
+ }
+ if meta.PartSizeBytes > 0 {
+ queueSize += meta.PartSizeBytes
+ }
+ }
+ totalRecoveredSize += queueSize
+
+ // Log pending parts count
+ if len(pending) > 0 {
+ hc.l.Info().
+ Str("node", nodeAddr).
+ Int("pending", len(pending)).
+ Uint64("sizeMB", queueSize/1024/1024).
+ Msg("loaded handoff queue with pending parts")
+ }
+ }
+
+ // Update current total size
+ hc.currentTotalSize = totalRecoveredSize
+ if totalRecoveredSize > 0 {
+ hc.l.Info().
+ Uint64("totalSizeMB", totalRecoveredSize/1024/1024).
+ Int("nodeCount", len(hc.nodeQueues)).
+ Msg("recovered handoff queue state")
+ }
+ if len(errs) > 0 {
+ return errors.Join(errs...)
+ }
+
+ return nil
+}
+
+// enqueueForNode adds a part to the handoff queue for a specific node.
+func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64,
partType string, sourcePath string,
+ group string, shardID uint32,
+) error {
+ // Read part size from metadata
+ partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
+
+ // Check if enqueue would exceed limit
+ if !hc.canEnqueue(partSize) {
+ currentSize := hc.getTotalSize()
+ return fmt.Errorf("handoff queue full: current=%d MB, limit=%d
MB, part=%d MB",
+ currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024,
partSize/1024/1024)
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: group,
+ ShardID: shardID,
+ PartType: partType,
+ PartSizeBytes: partSize,
+ }
+
+ nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+ if err != nil {
+ return fmt.Errorf("failed to get node queue for %s: %w",
nodeAddr, err)
+ }
+
+ if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err !=
nil {
+ return err
+ }
+
+ // Update total size after successful enqueue
+ hc.updateTotalSize(int64(partSize))
+
+ return nil
+}
+
+// enqueueForNodes adds a part to the handoff queues for multiple offline
nodes.
+func (hc *handoffController) enqueueForNodes(offlineNodes []string, partID
uint64, partType string, sourcePath string,
+ group string, shardID uint32,
+) error {
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: group,
+ ShardID: shardID,
+ PartType: partType,
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ var firstErr error
+ successCount := 0
+
+ // For each offline node, create hard-linked copy
+ for _, nodeAddr := range offlineNodes {
+ nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+ if err != nil {
+ hc.l.Error().Err(err).Str("node", nodeAddr).Msg("failed
to get node queue")
+ if firstErr == nil {
+ firstErr = err
+ }
+ continue
+ }
+
+ if err := nodeQueue.enqueue(partID, partType, sourcePath,
meta); err != nil {
+ hc.l.Error().Err(err).Str("node",
nodeAddr).Uint64("partId", partID).Str("partType", partType).
+ Msg("failed to enqueue part")
+ if firstErr == nil {
+ firstErr = err
+ }
+ continue
+ }
+
+ successCount++
+ }
+
+ if successCount > 0 {
+ hc.l.Info().
+ Int("successCount", successCount).
+ Int("totalNodes", len(offlineNodes)).
+ Uint64("partId", partID).
+ Str("partType", partType).
+ Msg("part enqueued to handoff queues")
+ }
+
+ // Return error only if all enqueues failed
+ if successCount == 0 && firstErr != nil {
+ return firstErr
+ }
+
+ return nil
+}
+
+// getOrCreateNodeQueue gets an existing node queue or creates a new one.
+// Caller must hold hc.mu lock.
+func (hc *handoffController) getOrCreateNodeQueue(nodeAddr string)
(*handoffNodeQueue, error) {
+ // Check if queue already exists
+ if queue, exists := hc.nodeQueues[nodeAddr]; exists {
+ return queue, nil
+ }
+
+ // Create new queue
+ sanitizedAddr := sanitizeNodeAddr(nodeAddr)
+ nodeRoot := filepath.Join(hc.root, sanitizedAddr)
+
+ nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot,
hc.fileSystem, hc.l)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create node queue: %w", err)
+ }
+
+ hc.nodeQueues[nodeAddr] = nodeQueue
+ return nodeQueue, nil
+}
+
+// listPendingForNode returns all pending parts with their types for a
specific node.
+func (hc *handoffController) listPendingForNode(nodeAddr string)
([]partTypePair, error) {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return nil, nil // No queue means no pending parts
+ }
+
+ return nodeQueue.listPending()
+}
+
+// getPartPath returns the path to a specific part type directory in a node's
handoff queue.
+func (hc *handoffController) getPartPath(nodeAddr string, partID uint64,
partType string) string {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return ""
+ }
+
+ return nodeQueue.getPartTypePath(partID, partType)
+}
+
+// getPartMetadata returns the handoff metadata for a specific part type.
+func (hc *handoffController) getPartMetadata(nodeAddr string, partID uint64,
partType string) (*handoffMetadata, error) {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return nil, fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ return nodeQueue.getMetadata(partID, partType)
+}
+
+// completeSend removes a specific part type from a node's handoff queue after
successful delivery.
+func (hc *handoffController) completeSend(nodeAddr string, partID uint64,
partType string) error {
+ // Get part size before removing
+ var partSize uint64
+ meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+ if err == nil && meta.PartSizeBytes > 0 {
+ partSize = meta.PartSizeBytes
+ }
+
+ hc.mu.RLock()
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ hc.mu.RUnlock()
+
+ if !exists {
+ return fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ if err := nodeQueue.complete(partID, partType); err != nil {
+ return err
+ }
+
+ // Update total size after successful removal
+ if partSize > 0 {
+ hc.updateTotalSize(-int64(partSize))
+ }
+
+ return nil
+}
+
+// completeSendAll removes all part types for a given partID from a node's
handoff queue.
+func (hc *handoffController) completeSendAll(nodeAddr string, partID uint64)
error {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ return nodeQueue.completeAll(partID)
+}
+
+// getNodeQueueSize returns the total size of pending parts for a specific
node.
+func (hc *handoffController) getNodeQueueSize(nodeAddr string) (uint64, error)
{
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return 0, nil
+ }
+
+ return nodeQueue.size()
+}
+
+// getAllNodeQueues returns a snapshot of all node addresses with handoff
queues.
+func (hc *handoffController) getAllNodeQueues() []string {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodes := make([]string, 0, len(hc.nodeQueues))
+ for nodeAddr := range hc.nodeQueues {
+ nodes = append(nodes, nodeAddr)
+ }
+
+ return nodes
+}
+
+// partInfo contains information about a part to be enqueued.
+type partInfo struct {
+ path string
+ group string
+ partID uint64
+ shardID common.ShardID
+}
+
+// calculateOfflineNodes returns the list of offline nodes responsible for the
shard.
+func (hc *handoffController) calculateOfflineNodes(onlineNodes []string, group
string, shardID common.ShardID) []string {
+ if hc == nil {
+ return nil
+ }
+
+ onlineSet := make(map[string]struct{}, len(onlineNodes))
+ for _, node := range onlineNodes {
+ onlineSet[node] = struct{}{}
+ }
+
+ candidates := hc.nodesForShard(group, uint32(shardID))
+ seen := make(map[string]struct{}, len(candidates))
+ var offlineNodes []string
+ for _, node := range candidates {
+ if _, dup := seen[node]; dup {
+ continue
+ }
+ seen[node] = struct{}{}
+ if !hc.isNodeHealthy(node) {
+ offlineNodes = append(offlineNodes, node)
+ continue
+ }
+ if len(onlineSet) > 0 {
+ if _, isOnline := onlineSet[node]; !isOnline {
+ offlineNodes = append(offlineNodes, node)
+ }
+ continue
+ }
+ if hc.tire2Client == nil {
+ offlineNodes = append(offlineNodes, node)
+ }
+ }
+
+ return offlineNodes
+}
+
+// enqueueForOfflineNodes enqueues the provided core and sidx parts for each
offline node.
+func (hc *handoffController) enqueueForOfflineNodes(
+ offlineNodes []string,
+ coreParts []partInfo,
+ sidxParts map[string][]partInfo,
+) error {
+ if hc == nil || len(offlineNodes) == 0 {
+ return nil
+ }
+
+ group, shardID, hasShardInfo := extractShardDetails(coreParts,
sidxParts)
+ if hasShardInfo {
+ filtered := hc.filterNodesForShard(offlineNodes, group, shardID)
+ if len(filtered) == 0 {
+ hc.l.Debug().
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Msg("no offline shard owners to enqueue")
+ return nil
+ }
+ offlineNodes = filtered
+ }
+
+ // Track enqueue statistics
+ totalCoreEnqueued := 0
+ totalSidxEnqueued := 0
+ var lastErr error
+
+ // For each offline node, enqueue all parts
+ for _, nodeAddr := range offlineNodes {
+ // Enqueue core parts
+ for _, coreInfo := range coreParts {
+ err := hc.enqueueForNode(nodeAddr, coreInfo.partID,
PartTypeCore, coreInfo.path, coreInfo.group, uint32(coreInfo.shardID))
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", coreInfo.partID).
+ Msg("failed to enqueue core part")
+ lastErr = err
+ } else {
+ totalCoreEnqueued++
+ }
+ }
+
+ // Enqueue sidx parts
+ for sidxName, parts := range sidxParts {
+ for _, sidxInfo := range parts {
+ err := hc.enqueueForNode(nodeAddr,
sidxInfo.partID, sidxName, sidxInfo.path, sidxInfo.group,
uint32(sidxInfo.shardID))
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Str("sidx", sidxName).
+ Uint64("partID",
sidxInfo.partID).
+ Msg("failed to enqueue sidx
part")
+ lastErr = err
+ } else {
+ totalSidxEnqueued++
+ }
+ }
+ }
+ }
+
+ // Log summary
+ hc.l.Info().
+ Int("offlineNodes", len(offlineNodes)).
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Int("corePartsEnqueued", totalCoreEnqueued).
+ Int("sidxPartsEnqueued", totalSidxEnqueued).
+ Msg("enqueued parts for offline nodes")
+
+ return lastErr
+}
+
+func extractShardDetails(coreParts []partInfo, sidxParts
map[string][]partInfo) (string, uint32, bool) {
+ if len(coreParts) > 0 {
+ return coreParts[0].group, uint32(coreParts[0].shardID), true
+ }
+ for _, parts := range sidxParts {
+ if len(parts) == 0 {
+ continue
+ }
+ return parts[0].group, uint32(parts[0].shardID), true
+ }
+ return "", 0, false
+}
+
+func (hc *handoffController) nodesForShard(group string, shardID uint32)
[]string {
+ if hc == nil {
+ return nil
+ }
+ if hc.resolveShardAssignments == nil {
+ return hc.allDataNodes
+ }
+ nodes, err := hc.resolveShardAssignments(group, shardID)
+ if err != nil {
+ if hc.l != nil {
+ hc.l.Warn().
+ Err(err).
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Msg("failed to resolve shard assignments, using
configured node list")
+ }
+ return hc.allDataNodes
+ }
+ if len(nodes) == 0 {
+ return hc.allDataNodes
+ }
+ return nodes
+}
+
+func (hc *handoffController) filterNodesForShard(nodes []string, group string,
shardID uint32) []string {
+ candidates := hc.nodesForShard(group, shardID)
+ if len(candidates) == 0 {
+ return nil
+ }
+ candidateSet := make(map[string]struct{}, len(candidates))
+ for _, node := range candidates {
+ candidateSet[node] = struct{}{}
+ }
+ filtered := make([]string, 0, len(nodes))
+ seen := make(map[string]struct{}, len(nodes))
+ for _, node := range nodes {
+ if _, already := seen[node]; already {
+ continue
+ }
+ seen[node] = struct{}{}
+ if _, ok := candidateSet[node]; ok {
+ filtered = append(filtered, node)
+ }
+ }
+ return filtered
+}
+
+// close closes the handoff controller.
+func (hc *handoffController) close() error {
+ // Stop the monitor
+ if hc.stopMonitor != nil {
+ close(hc.stopMonitor)
+ hc.monitorWg.Wait()
+ }
+
+ // Stop the replay worker
+ if hc.replayStopChan != nil {
+ close(hc.replayStopChan)
+ hc.replayWg.Wait()
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ // Clear node queues
+ hc.nodeQueues = nil
+
+ // Clear in-flight tracking
+ hc.inFlightSends = nil
+
+ return nil
+}
+
+// getTotalSize returns the current total size across all node queues.
+func (hc *handoffController) getTotalSize() uint64 {
+ hc.sizeMu.RLock()
+ defer hc.sizeMu.RUnlock()
+ return hc.currentTotalSize
+}
+
+// canEnqueue checks if adding a part of the given size would exceed the total
size limit.
+func (hc *handoffController) canEnqueue(partSize uint64) bool {
+ if hc.maxTotalSizeBytes == 0 {
+ return true // No limit configured
+ }
+
+ hc.sizeMu.RLock()
+ defer hc.sizeMu.RUnlock()
+ return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+}
+
+// readPartSizeFromMetadata reads the CompressedSizeBytes from the part's
metadata file.
+func (hc *handoffController) readPartSizeFromMetadata(sourcePath, partType
string) uint64 {
+ var metadataPath string
+
+ // Core parts use metadata.json, sidx parts use manifest.json
+ if partType == PartTypeCore {
+ metadataPath = filepath.Join(sourcePath, metadataFilename) //
"metadata.json"
+ } else {
+ metadataPath = filepath.Join(sourcePath, "manifest.json")
+ }
+
+ // Read metadata file
+ data, err := hc.fileSystem.Read(metadataPath)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to
read metadata file")
+ return 0
+ }
+
+ // Parse metadata to get CompressedSizeBytes
+ var metadata struct {
+ CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+ }
+ if err := json.Unmarshal(data, &metadata); err != nil {
+ hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to
parse metadata")
+ return 0
+ }
+
+ return metadata.CompressedSizeBytes
+}
+
+// updateTotalSize atomically updates the current total size.
+func (hc *handoffController) updateTotalSize(delta int64) {
+ hc.sizeMu.Lock()
+ defer hc.sizeMu.Unlock()
+
+ if delta > 0 {
+ // Enqueue: add to total
+ hc.currentTotalSize += uint64(delta)
+ } else if delta < 0 {
+ // Complete: subtract from total with underflow check
+ toSubtract := uint64(-delta)
+ if toSubtract > hc.currentTotalSize {
+ hc.l.Warn().
+ Uint64("current", hc.currentTotalSize).
+ Uint64("toSubtract", toSubtract).
+ Msg("attempted to subtract more than current
size, resetting to 0")
+ hc.currentTotalSize = 0
+ } else {
+ hc.currentTotalSize -= toSubtract
+ }
+ }
+}
+
+// sanitizeNodeAddr converts a node address to a safe directory name.
+// It replaces colons and other special characters with underscores.
+func sanitizeNodeAddr(addr string) string {
+ // Replace common special characters
+ sanitized := strings.ReplaceAll(addr, ":", "_")
+ sanitized = strings.ReplaceAll(sanitized, "/", "_")
+ sanitized = strings.ReplaceAll(sanitized, "\\", "_")
+ return sanitized
+}
+
+// startMonitor starts the background node status monitoring goroutine.
+func (hc *handoffController) startMonitor() {
+ if hc.tire2Client == nil || len(hc.allDataNodes) == 0 {
+ hc.l.Info().Msg("node status monitor disabled (no tire2 client
or data nodes)")
+ return
+ }
+
+ // Initialize healthy nodes from tire2 client
+ currentHealthy := hc.tire2Client.HealthyNodes()
+ for _, node := range currentHealthy {
+ hc.healthyNodes[node] = struct{}{}
+ }
+
+ hc.monitorWg.Add(2)
+
+ // Goroutine 1: Periodic polling
+ go hc.pollNodeStatus()
+
+ // Goroutine 2: Handle status changes
+ go hc.handleStatusChanges()
+
+ hc.l.Info().
+ Int("dataNodes", len(hc.allDataNodes)).
+ Dur("checkInterval", hc.checkInterval).
+ Msg("node status monitor started")
+}
+
+// pollNodeStatus periodically polls the pub client for healthy nodes.
+func (hc *handoffController) pollNodeStatus() {
+ defer hc.monitorWg.Done()
+
+ ticker := time.NewTicker(hc.checkInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ hc.checkAndNotifyStatusChanges()
+ case <-hc.stopMonitor:
+ return
+ }
+ }
+}
+
+// checkAndNotifyStatusChanges compares current vs previous health status.
+func (hc *handoffController) checkAndNotifyStatusChanges() {
+ if hc.tire2Client == nil {
+ return
+ }
+
+ currentHealthy := hc.tire2Client.HealthyNodes()
+ currentHealthySet := make(map[string]struct{})
+ for _, node := range currentHealthy {
+ currentHealthySet[node] = struct{}{}
+ }
+
+ hc.mu.Lock()
+ previousHealthy := hc.healthyNodes
+ hc.healthyNodes = currentHealthySet
+ hc.mu.Unlock()
+
+ // Detect nodes that came online (in current but not in previous)
+ for node := range currentHealthySet {
+ if _, wasHealthy := previousHealthy[node]; !wasHealthy {
+ select {
+ case hc.statusChangeChan <- nodeStatusChange{nodeName:
node, isOnline: true}:
+ hc.l.Info().Str("node", node).Msg("detected
node coming online")
+ default:
+ hc.l.Warn().Str("node", node).Msg("status
change channel full")
+ }
+ }
+ }
+
+ // Detect nodes that went offline (in previous but not in current)
+ for node := range previousHealthy {
+ if _, isHealthy := currentHealthySet[node]; !isHealthy {
+ select {
+ case hc.statusChangeChan <- nodeStatusChange{nodeName:
node, isOnline: false}:
+ hc.l.Info().Str("node", node).Msg("detected
node going offline")
+ default:
+ hc.l.Warn().Str("node", node).Msg("status
change channel full")
+ }
+ }
+ }
+}
+
+// handleStatusChanges processes node status changes.
+func (hc *handoffController) handleStatusChanges() {
+ defer hc.monitorWg.Done()
+
+ for {
+ select {
+ case change := <-hc.statusChangeChan:
+ if change.isOnline {
+ hc.onNodeOnline(change.nodeName)
+ } else {
+ hc.onNodeOffline(change.nodeName)
+ }
+ case <-hc.stopMonitor:
+ return
+ }
+ }
+}
+
+// onNodeOnline handles a node coming online.
+func (hc *handoffController) onNodeOnline(nodeName string) {
+ hc.mu.RLock()
+ _, hasQueue := hc.nodeQueues[nodeName]
+ hc.mu.RUnlock()
+
+ if !hasQueue {
+ hc.l.Debug().Str("node", nodeName).Msg("node online but no
handoff queue")
+ return
+ }
+
+ // Trigger replay for this node
+ select {
+ case hc.replayTriggerChan <- nodeName:
+ hc.l.Info().Str("node", nodeName).Msg("triggered replay for
recovered node")
+ default:
+ hc.l.Warn().Str("node", nodeName).Msg("replay trigger channel
full")
+ }
+}
+
+// onNodeOffline handles a node going offline.
+func (hc *handoffController) onNodeOffline(nodeName string) {
+ hc.l.Info().Str("node", nodeName).Msg("node went offline")
+ // No immediate action needed - syncer will detect send failures
+}
+
+// isNodeHealthy checks if a specific node is currently healthy.
+func (hc *handoffController) isNodeHealthy(nodeName string) bool {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+ _, healthy := hc.healthyNodes[nodeName]
+ return healthy
+}
+
+// startReplayWorker starts the background replay worker goroutine.
+func (hc *handoffController) startReplayWorker() {
+ if hc.tire2Client == nil {
+ hc.l.Info().Msg("replay worker disabled (no tire2 client)")
+ return
+ }
+
+ hc.replayWg.Add(1)
+ go hc.replayWorkerLoop()
+
+ hc.l.Info().
+ Int("batchSize", hc.replayBatchSize).
+ Dur("pollInterval", hc.replayPollInterval).
+ Msg("replay worker started")
+}
+
+// replayWorkerLoop is the main replay worker loop.
+func (hc *handoffController) replayWorkerLoop() {
+ defer hc.replayWg.Done()
+
+ ticker := time.NewTicker(hc.replayPollInterval)
+ defer ticker.Stop()
+
+ // Track nodes that have been triggered for replay
+ triggeredNodes := make(map[string]struct{})
+
+ for {
+ select {
+ case nodeAddr := <-hc.replayTriggerChan:
+ // Node came online, mark for immediate processing
+ triggeredNodes[nodeAddr] = struct{}{}
+ hc.l.Debug().Str("node", nodeAddr).Msg("node marked for
replay")
+
+ case <-ticker.C:
+ // Periodic check for work
+ nodesWithWork := hc.getNodesWithPendingParts()
+
+ // Process triggered nodes first, then round-robin
through all nodes with work
+ nodesToProcess := make([]string, 0,
len(triggeredNodes)+len(nodesWithWork))
+ for node := range triggeredNodes {
+ nodesToProcess = append(nodesToProcess, node)
+ }
+ for _, node := range nodesWithWork {
+ if _, alreadyTriggered := triggeredNodes[node];
!alreadyTriggered {
+ nodesToProcess = append(nodesToProcess,
node)
+ }
+ }
+
+ // Process each node with pending parts (round-robin)
+ for _, nodeAddr := range nodesToProcess {
+ select {
+ case <-hc.replayStopChan:
+ return
+ default:
+ }
+
+ // Check if node is healthy
+ if !hc.isNodeHealthy(nodeAddr) {
+ continue
+ }
+
+ // Process a batch for this node
+ processed, err :=
hc.replayBatchForNode(nodeAddr, hc.replayBatchSize)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("node",
nodeAddr).Msg("replay batch failed")
+ }
+
+ // If we processed parts successfully, remove
from triggered list
+ if processed > 0 {
+ delete(triggeredNodes, nodeAddr)
+ }
+
+ // If node has no more pending parts, remove
from triggered list
+ pending, _ := hc.listPendingForNode(nodeAddr)
+ if len(pending) == 0 {
+ delete(triggeredNodes, nodeAddr)
+ }
+ }
+
+ case <-hc.replayStopChan:
+ return
+ }
+ }
+}
+
+// replayBatchForNode processes a batch of parts for a specific node.
+// Returns the number of parts successfully replayed and any error.
+func (hc *handoffController) replayBatchForNode(nodeAddr string, maxParts int)
(int, error) {
+ // Get pending parts for this node
+ pending, err := hc.listPendingForNode(nodeAddr)
+ if err != nil {
+ return 0, fmt.Errorf("failed to list pending parts: %w", err)
+ }
+
+ if len(pending) == 0 {
+ return 0, nil
+ }
+
+ // Limit to batch size
+ if len(pending) > maxParts {
+ pending = pending[:maxParts]
+ }
+
+ successCount := 0
+ ctx := context.Background()
+
+ for _, ptp := range pending {
+ // Check if already in-flight
+ if hc.isInFlight(nodeAddr, ptp.PartID) {
+ hc.l.Debug().
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("skipping in-flight part")
+ continue
+ }
+
+ // Mark as in-flight
+ hc.markInFlight(nodeAddr, ptp.PartID, true)
+
+ // Read part from handoff queue
+ streamingPart, release, err := hc.readPartFromHandoff(nodeAddr,
ptp.PartID, ptp.PartType)
+ if err != nil {
+ hc.l.Error().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to read part from handoff")
+ hc.markInFlight(nodeAddr, ptp.PartID, false)
+ continue
+ }
+
+ // Send part to node
+ err = hc.sendPartToNode(ctx, nodeAddr, streamingPart)
+ release()
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to send part during replay")
+ hc.markInFlight(nodeAddr, ptp.PartID, false)
+ continue
+ }
+
+ // Mark as complete
+ if err := hc.completeSend(nodeAddr, ptp.PartID, ptp.PartType);
err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to mark part as complete")
+ }
+
+ // Remove from in-flight
+ hc.markInFlight(nodeAddr, ptp.PartID, false)
+
+ successCount++
+
+ hc.l.Info().
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("successfully replayed part")
+ }
+
+ return successCount, nil
+}
+
+// markInFlight marks a part as being sent (or removes the mark).
+func (hc *handoffController) markInFlight(nodeAddr string, partID uint64,
inFlight bool) {
+ hc.inFlightMu.Lock()
+ defer hc.inFlightMu.Unlock()
+
+ if inFlight {
+ // Add to in-flight set
+ if hc.inFlightSends[nodeAddr] == nil {
+ hc.inFlightSends[nodeAddr] = make(map[uint64]struct{})
+ }
+ hc.inFlightSends[nodeAddr][partID] = struct{}{}
+ } else {
+ // Remove from in-flight set
+ if nodeSet, exists := hc.inFlightSends[nodeAddr]; exists {
+ delete(nodeSet, partID)
+ if len(nodeSet) == 0 {
+ delete(hc.inFlightSends, nodeAddr)
+ }
+ }
+ }
+}
+
+// isInFlight checks if a part is currently being sent to a node.
+func (hc *handoffController) isInFlight(nodeAddr string, partID uint64) bool {
+ hc.inFlightMu.RLock()
+ defer hc.inFlightMu.RUnlock()
+
+ if nodeSet, exists := hc.inFlightSends[nodeAddr]; exists {
+ _, inFlight := nodeSet[partID]
+ return inFlight
+ }
+ return false
+}
+
+// getNodesWithPendingParts returns all node addresses that have pending parts
in their queues.
+func (hc *handoffController) getNodesWithPendingParts() []string {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ var nodes []string
+ for nodeAddr, queue := range hc.nodeQueues {
+ pending, _ := queue.listPending()
+ if len(pending) > 0 {
+ nodes = append(nodes, nodeAddr)
+ }
+ }
+
+ return nodes
+}
+
+// readPartFromHandoff reads a part from the handoff queue and prepares it for
sending.
+func (hc *handoffController) readPartFromHandoff(nodeAddr string, partID
uint64, partType string) (*queue.StreamingPartData, func(), error) {
+ // Get the path to the hard-linked part
+ partPath := hc.getPartPath(nodeAddr, partID, partType)
+ if partPath == "" {
+ return nil, func() {}, fmt.Errorf("part not found in handoff
queue")
+ }
+
+ // Get the metadata for this part
+ meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+ if err != nil {
+ return nil, func() {}, fmt.Errorf("failed to get part metadata:
%w", err)
+ }
+
+ // Read files directly from the filesystem (both core and sidx parts)
+ // The handoff storage uses nested structure:
<nodeRoot>/<partID>/<partType>/
+ entries := hc.fileSystem.ReadDir(partPath)
+ var files []queue.FileInfo
+ var buffers []*bytes.Buffer
+
+ for _, entry := range entries {
+ if entry.IsDir() || strings.HasPrefix(entry.Name(), ".") {
+ continue
+ }
+
+ streamName, include := mapStreamingFileName(partType,
entry.Name())
+ if !include {
+ continue
+ }
+
+ filePath := filepath.Join(partPath, entry.Name())
+ data, err := hc.fileSystem.Read(filePath)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("file",
entry.Name()).Msg("failed to read file")
+ continue
+ }
+
+ // Create a buffer and use its sequential reader
+ buf := bigValuePool.Generate()
+ buf.Buf = append(buf.Buf[:0], data...)
+ buffers = append(buffers, buf)
+
+ files = append(files, queue.FileInfo{
+ Name: streamName,
+ Reader: buf.SequentialRead(),
+ })
+ }
+
+ if len(files) == 0 {
+ for _, buf := range buffers {
+ bigValuePool.Release(buf)
+ }
+ return nil, func() {}, fmt.Errorf("no files found in part
directory")
+ }
+
+ // Create streaming part data
+ streamingPart := &queue.StreamingPartData{
+ ID: partID,
+ Group: meta.Group,
+ ShardID: meta.ShardID,
+ Topic: data.TopicTracePartSync.String(),
+ Files: files,
+ PartType: partType,
+ }
+
+ // For core parts, read additional metadata from metadata.json if
present
+ if partType == PartTypeCore {
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ if metadataBytes, err := hc.fileSystem.Read(metadataPath); err
== nil {
+ var pm partMetadata
+ if err := json.Unmarshal(metadataBytes, &pm); err ==
nil {
+ streamingPart.CompressedSizeBytes =
pm.CompressedSizeBytes
+ streamingPart.UncompressedSizeBytes =
pm.UncompressedSpanSizeBytes
+ streamingPart.TotalCount = pm.TotalCount
+ streamingPart.BlocksCount = pm.BlocksCount
+ streamingPart.MinTimestamp = pm.MinTimestamp
+ streamingPart.MaxTimestamp = pm.MaxTimestamp
+ }
+ }
+ }
+
+ release := func() {
+ for _, buf := range buffers {
+ bigValuePool.Release(buf)
+ }
+ }
+
+ return streamingPart, release, nil
+}
+
+func mapStreamingFileName(partType, fileName string) (string, bool) {
+ if partType == PartTypeCore {
+ switch fileName {
+ case primaryFilename:
+ return tracePrimaryName, true
+ case spansFilename:
+ return traceSpansName, true
+ case metaFilename:
+ return traceMetaName, true
+ case traceIDFilterFilename, tagTypeFilename:
+ return fileName, true
+ case metadataFilename:
+ return "", false
+ }
+
+ if strings.HasSuffix(fileName, tagsFilenameExt) {
+ tagName := strings.TrimSuffix(fileName, tagsFilenameExt)
+ return traceTagsPrefix + tagName, true
+ }
+ if strings.HasSuffix(fileName, tagsMetadataFilenameExt) {
+ tagName := strings.TrimSuffix(fileName,
tagsMetadataFilenameExt)
+ return traceTagMetadataPrefix + tagName, true
+ }
+
+ return "", false
+ }
+
+ switch fileName {
+ case sidx.SidxPrimaryName + ".bin":
+ return sidx.SidxPrimaryName, true
+ case sidx.SidxDataName + ".bin":
+ return sidx.SidxDataName, true
+ case sidx.SidxKeysName + ".bin":
+ return sidx.SidxKeysName, true
+ case sidx.SidxMetaName + ".bin":
+ return sidx.SidxMetaName, true
+ case "manifest.json":
+ return "", false
+ }
+
+ if strings.HasSuffix(fileName, ".td") {
+ tagName := strings.TrimSuffix(fileName, ".td")
+ return sidx.TagDataPrefix + tagName, true
+ }
+ if strings.HasSuffix(fileName, ".tm") {
+ tagName := strings.TrimSuffix(fileName, ".tm")
+ return sidx.TagMetadataPrefix + tagName, true
+ }
+ if strings.HasSuffix(fileName, ".tf") {
+ tagName := strings.TrimSuffix(fileName, ".tf")
+ return sidx.TagFilterPrefix + tagName, true
+ }
+
+ return "", false
+}
+
+// sendPartToNode sends a single part to a node using ChunkedSyncClient.
+func (hc *handoffController) sendPartToNode(ctx context.Context, nodeAddr
string, streamingPart *queue.StreamingPartData) error {
+ // Create chunked sync client
+ chunkedClient, err := hc.tire2Client.NewChunkedSyncClient(nodeAddr,
1024*1024)
+ if err != nil {
+ return fmt.Errorf("failed to create chunked sync client: %w",
err)
+ }
+ defer chunkedClient.Close()
+
+ // Send the part
+ result, err := chunkedClient.SyncStreamingParts(ctx,
[]queue.StreamingPartData{*streamingPart})
+ if err != nil {
+ return fmt.Errorf("failed to sync streaming part: %w", err)
+ }
+
+ if !result.Success {
+ return fmt.Errorf("sync failed: %s", result.ErrorMessage)
+ }
+
+ hc.l.Debug().
+ Str("node", nodeAddr).
+ Uint64("partID", streamingPart.ID).
+ Str("partType", streamingPart.PartType).
+ Str("session", result.SessionID).
+ Uint64("bytes", result.TotalBytes).
+ Msg("part sent successfully during replay")
+
+ return nil
+}
diff --git a/banyand/trace/handoff_replay_test.go
b/banyand/trace/handoff_replay_test.go
new file mode 100644
index 00000000..ec0610cd
--- /dev/null
+++ b/banyand/trace/handoff_replay_test.go
@@ -0,0 +1,406 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "path/filepath"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+const (
+ testNodeAddrPrimary = "node1.example.com:17912"
+ testNodeAddrSecondary = "node2.example.com:17912"
+)
+
+// simpleMockClient is a minimal mock for testing replay functionality.
+type simpleMockClient struct {
+ sendError error
+ healthyNodes []string
+ sendCalled int
+ mu sync.Mutex
+}
+
+func newSimpleMockClient(healthyNodes []string) *simpleMockClient {
+ return &simpleMockClient{
+ healthyNodes: healthyNodes,
+ }
+}
+
+func (m *simpleMockClient) HealthyNodes() []string {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.healthyNodes
+}
+
+func (m *simpleMockClient) NewChunkedSyncClient(node string, _ uint32)
(queue.ChunkedSyncClient, error) {
+ return &simpleMockChunkedClient{
+ mockClient: m,
+ node: node,
+ }, nil
+}
+
+func (m *simpleMockClient) getSendCount() int {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.sendCalled
+}
+
+func (m *simpleMockClient) resetSendCount() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.sendCalled = 0
+}
+
+// simpleMockChunkedClient is a minimal mock for ChunkedSyncClient.
+type simpleMockChunkedClient struct {
+ mockClient *simpleMockClient
+ node string
+}
+
+func (m *simpleMockChunkedClient) SyncStreamingParts(_ context.Context, parts
[]queue.StreamingPartData) (*queue.SyncResult, error) {
+ m.mockClient.mu.Lock()
+ m.mockClient.sendCalled++
+ sendError := m.mockClient.sendError
+ m.mockClient.mu.Unlock()
+
+ if sendError != nil {
+ return nil, sendError
+ }
+ return &queue.SyncResult{
+ Success: true,
+ SessionID: "test-session",
+ TotalBytes: 1000,
+ PartsCount: uint32(len(parts)),
+ }, nil
+}
+
+func (m *simpleMockChunkedClient) Close() error {
+ return nil
+}
+
+// TestHandoffController_InFlightTracking tests the in-flight tracking
mechanism.
+func TestHandoffController_InFlightTracking(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+ partID := uint64(0x10)
+
+ // Initially not in-flight
+ assert.False(t, controller.isInFlight(nodeAddr, partID))
+
+ // Mark as in-flight
+ controller.markInFlight(nodeAddr, partID, true)
+ assert.True(t, controller.isInFlight(nodeAddr, partID))
+
+ // Mark as not in-flight
+ controller.markInFlight(nodeAddr, partID, false)
+ assert.False(t, controller.isInFlight(nodeAddr, partID))
+
+ // Test multiple parts
+ partID2 := uint64(0x11)
+ controller.markInFlight(nodeAddr, partID, true)
+ controller.markInFlight(nodeAddr, partID2, true)
+ assert.True(t, controller.isInFlight(nodeAddr, partID))
+ assert.True(t, controller.isInFlight(nodeAddr, partID2))
+
+ // Remove one
+ controller.markInFlight(nodeAddr, partID, false)
+ assert.False(t, controller.isInFlight(nodeAddr, partID))
+ assert.True(t, controller.isInFlight(nodeAddr, partID2))
+}
+
+// TestHandoffController_GetNodesWithPendingParts tests the node enumeration.
+func TestHandoffController_GetNodesWithPendingParts(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x12)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ nodeAddr1 := testNodeAddrPrimary
+ nodeAddr2 := testNodeAddrSecondary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr1, nodeAddr2}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Initially no nodes with pending parts
+ nodes := controller.getNodesWithPendingParts()
+ assert.Empty(t, nodes)
+
+ // Enqueue for one node
+ err = controller.enqueueForNode(nodeAddr1, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+
+ nodes = controller.getNodesWithPendingParts()
+ assert.Len(t, nodes, 1)
+ assert.Contains(t, nodes, nodeAddr1)
+
+ // Enqueue for another node
+ partID2 := uint64(0x13)
+ sourcePath2 := createTestPart(t, fileSystem, sourceRoot, partID2)
+ err = controller.enqueueForNode(nodeAddr2, partID2, PartTypeCore,
sourcePath2, "default", 0)
+ require.NoError(t, err)
+
+ nodes = controller.getNodesWithPendingParts()
+ assert.Len(t, nodes, 2)
+ assert.Contains(t, nodes, nodeAddr1)
+ assert.Contains(t, nodes, nodeAddr2)
+
+ // Complete one node's parts
+ err = controller.completeSend(nodeAddr1, partID, PartTypeCore)
+ require.NoError(t, err)
+
+ nodes = controller.getNodesWithPendingParts()
+ assert.Len(t, nodes, 1)
+ assert.Contains(t, nodes, nodeAddr2)
+}
+
+// TestHandoffController_ReadPartFromHandoff tests reading parts from handoff
storage.
+func TestHandoffController_ReadPartFromHandoff(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x14)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ nodeAddr := testNodeAddrPrimary
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Enqueue a part
+ err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+
+ // Read the part back
+ streamingPart, release, err := controller.readPartFromHandoff(nodeAddr,
partID, PartTypeCore)
+ require.NoError(t, err)
+ require.NotNil(t, streamingPart)
+ release()
+
+ // Verify basic fields
+ assert.Equal(t, partID, streamingPart.ID)
+ assert.Equal(t, "default", streamingPart.Group)
+ assert.Equal(t, PartTypeCore, streamingPart.PartType)
+ assert.NotEmpty(t, streamingPart.Files)
+}
+
+// TestHandoffController_ReplayBatchForNode tests the batch replay logic.
+func TestHandoffController_ReplayBatchForNode(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ nodeAddr := testNodeAddrPrimary
+ // Create mock client
+ mockClient := newSimpleMockClient([]string{nodeAddr})
+
+ // Create source parts
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+ controller, err := newHandoffController(fileSystem, tempDir,
mockClient, []string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Enqueue multiple parts
+ numParts := 3
+ for i := 0; i < numParts; i++ {
+ partID := uint64(0x20 + i)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+ err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+ }
+
+ // Verify parts are pending
+ pending, err := controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, numParts)
+
+ // Replay batch (should process all 3 parts)
+ count, err := controller.replayBatchForNode(nodeAddr, 10)
+ require.NoError(t, err)
+ assert.Equal(t, numParts, count)
+ assert.Equal(t, numParts, mockClient.getSendCount())
+
+ // Verify all parts completed
+ pending, err = controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Empty(t, pending)
+}
+
+// TestHandoffController_ReplayBatchForNode_WithBatchLimit tests batch size
limiting.
+func TestHandoffController_ReplayBatchForNode_WithBatchLimit(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ nodeAddr := testNodeAddrPrimary
+ // Create mock client
+ mockClient := newSimpleMockClient([]string{nodeAddr})
+
+ // Create source parts
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+ controller, err := newHandoffController(fileSystem, tempDir,
mockClient, []string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Enqueue 5 parts
+ numParts := 5
+ for i := 0; i < numParts; i++ {
+ partID := uint64(0x30 + i)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+ err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+ }
+
+ // Replay with batch limit of 2
+ count, err := controller.replayBatchForNode(nodeAddr, 2)
+ require.NoError(t, err)
+ assert.Equal(t, 2, count)
+ assert.Equal(t, 2, mockClient.getSendCount())
+
+ // Verify 3 parts still pending
+ pending, err := controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, 3)
+
+ // Replay again
+ mockClient.resetSendCount()
+ count, err = controller.replayBatchForNode(nodeAddr, 2)
+ require.NoError(t, err)
+ assert.Equal(t, 2, count)
+
+ // Verify 1 part still pending
+ pending, err = controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, 1)
+}
+
+// TestHandoffController_ReplayBatchForNode_SkipsInFlight tests that in-flight
parts are skipped.
+func TestHandoffController_ReplayBatchForNode_SkipsInFlight(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ nodeAddr := testNodeAddrPrimary
+ // Create mock client
+ mockClient := newSimpleMockClient([]string{nodeAddr})
+
+ // Create source parts
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+ controller, err := newHandoffController(fileSystem, tempDir,
mockClient, []string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Enqueue parts
+ partID1 := uint64(0x40)
+ partID2 := uint64(0x41)
+ sourcePath1 := createTestPart(t, fileSystem, sourceRoot, partID1)
+ sourcePath2 := createTestPart(t, fileSystem, sourceRoot, partID2)
+
+ err = controller.enqueueForNode(nodeAddr, partID1, PartTypeCore,
sourcePath1, "default", 0)
+ require.NoError(t, err)
+ err = controller.enqueueForNode(nodeAddr, partID2, PartTypeCore,
sourcePath2, "default", 0)
+ require.NoError(t, err)
+
+ // Mark first part as in-flight
+ controller.markInFlight(nodeAddr, partID1, true)
+
+ // Replay should skip first part and only process second
+ count, err := controller.replayBatchForNode(nodeAddr, 10)
+ require.NoError(t, err)
+ assert.Equal(t, 1, count)
+
+ // First part should still be pending
+ pending, err := controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, 1)
+ assert.Equal(t, partID1, pending[0].PartID)
+}
+
+// TestHandoffController_SendPartToNode tests sending a part to a node.
+func TestHandoffController_SendPartToNode(t *testing.T) {
+ tempDir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ nodeAddr := testNodeAddrPrimary
+ // Create mock client
+ mockClient := newSimpleMockClient([]string{nodeAddr})
+
+ controller, err := newHandoffController(fileSystem, tempDir,
mockClient, []string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+ defer controller.close()
+
+ // Create a streaming part
+ streamingPart := &queue.StreamingPartData{
+ ID: 0x50,
+ Group: "default",
+ ShardID: 0,
+ PartType: PartTypeCore,
+ Files: []queue.FileInfo{},
+ }
+
+ // Send the part
+ err = controller.sendPartToNode(context.Background(),
testNodeAddrPrimary, streamingPart)
+ require.NoError(t, err)
+ assert.Equal(t, 1, mockClient.getSendCount())
+}
diff --git a/banyand/trace/handoff_storage.go b/banyand/trace/handoff_storage.go
new file mode 100644
index 00000000..b1ccc5b2
--- /dev/null
+++ b/banyand/trace/handoff_storage.go
@@ -0,0 +1,373 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "encoding/json"
+ "fmt"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "sync"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ handoffMetaFilename = ".handoff_meta"
+ nodeInfoFilename = ".node_info"
+)
+
+// handoffMetadata contains metadata for a handoff queue entry.
+type handoffMetadata struct {
+ Group string `json:"group"`
+ PartType string `json:"part_type"`
+ EnqueueTimestamp int64 `json:"enqueue_timestamp"`
+ PartSizeBytes uint64 `json:"part_size_bytes"`
+ ShardID uint32 `json:"shard_id"`
+}
+
+// handoffNodeQueue manages the handoff queue for a single data node.
+// It uses a per-node directory with hard-linked part directories.
+type handoffNodeQueue struct {
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ nodeAddr string
+ root string
+ mu sync.RWMutex
+}
+
+// newHandoffNodeQueue creates a new handoff queue for a specific node.
+func newHandoffNodeQueue(nodeAddr, root string, fileSystem fs.FileSystem, l
*logger.Logger) (*handoffNodeQueue, error) {
+ if fileSystem == nil {
+ return nil, fmt.Errorf("fileSystem is nil")
+ }
+ if l == nil {
+ return nil, fmt.Errorf("logger is nil")
+ }
+ if nodeAddr == "" {
+ return nil, fmt.Errorf("node address is empty")
+ }
+ if root == "" {
+ return nil, fmt.Errorf("queue root path is empty")
+ }
+
+ hnq := &handoffNodeQueue{
+ nodeAddr: nodeAddr,
+ root: root,
+ fileSystem: fileSystem,
+ l: l,
+ }
+
+ // Create node queue directory if it doesn't exist
+ fileSystem.MkdirIfNotExist(root, storage.DirPerm)
+
+ // Write node info file to persist the original node address
+ if err := hnq.writeNodeInfo(); err != nil {
+ l.Warn().Err(err).Msg("failed to write node info")
+ }
+
+ return hnq, nil
+}
+
+// writeNodeInfo writes the original node address to a metadata file.
+func (hnq *handoffNodeQueue) writeNodeInfo() error {
+ nodeInfoPath := filepath.Join(hnq.root, nodeInfoFilename)
+
+ // Check if already exists
+ if _, err := hnq.fileSystem.Read(nodeInfoPath); err == nil {
+ return nil // Already exists
+ }
+
+ lf, err := hnq.fileSystem.CreateLockFile(nodeInfoPath, storage.FilePerm)
+ if err != nil {
+ return fmt.Errorf("failed to create node info file: %w", err)
+ }
+
+ _, err = lf.Write([]byte(hnq.nodeAddr))
+ return err
+}
+
+// readNodeInfo reads the original node address from the metadata file.
+func readNodeInfo(fileSystem fs.FileSystem, root string) (string, error) {
+ nodeInfoPath := filepath.Join(root, nodeInfoFilename)
+ data, err := fileSystem.Read(nodeInfoPath)
+ if err != nil {
+ return "", fmt.Errorf("failed to read node info: %w", err)
+ }
+ return string(data), nil
+}
+
+// enqueue adds a part to the handoff queue by creating hard links and writing
metadata.
+// Uses nested structure: <nodeRoot>/<partId>/<partType>/.
+func (hnq *handoffNodeQueue) enqueue(partID uint64, partType string,
sourcePath string, meta *handoffMetadata) error {
+ hnq.mu.Lock()
+ defer hnq.mu.Unlock()
+
+ dstPath := hnq.getPartTypePath(partID, partType)
+
+ // Check if already exists (idempotent)
+ partIDDir := hnq.getPartIDDir(partID)
+
+ // Create parent directory for partID if it doesn't exist
+ hnq.fileSystem.MkdirIfNotExist(partIDDir, storage.DirPerm)
+
+ // Now check if part type already exists
+ partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+ for _, entry := range partTypeEntries {
+ if entry.Name() == partType && entry.IsDir() {
+ return nil // Already enqueued
+ }
+ }
+
+ // Create hardlinks directly to destination
+ if err := copyPartDirectory(hnq.fileSystem, sourcePath, dstPath); err
!= nil {
+ // Clean up partial copy on failure
+ hnq.fileSystem.MustRMAll(dstPath)
+ return fmt.Errorf("failed to hardlink part: %w", err)
+ }
+
+ // Write sidecar metadata file after hardlinks are created
+ metaPath := filepath.Join(dstPath, handoffMetaFilename)
+ metaBytes, err := json.Marshal(meta)
+ if err != nil {
+ hnq.fileSystem.MustRMAll(dstPath)
+ return fmt.Errorf("failed to marshal metadata: %w", err)
+ }
+
+ lf, err := hnq.fileSystem.CreateLockFile(metaPath, storage.FilePerm)
+ if err != nil {
+ hnq.fileSystem.MustRMAll(dstPath)
+ return fmt.Errorf("failed to create metadata file: %w", err)
+ }
+ if _, err := lf.Write(metaBytes); err != nil {
+ hnq.fileSystem.MustRMAll(dstPath)
+ return fmt.Errorf("failed to write metadata: %w", err)
+ }
+
+ hnq.l.Info().
+ Str("node", hnq.nodeAddr).
+ Uint64("partId", partID).
+ Str("partType", partType).
+ Str("path", dstPath).
+ Msg("part enqueued to handoff queue")
+
+ return nil
+}
+
+// partTypePair represents a pending part with its type.
+type partTypePair struct {
+ PartType string
+ PartID uint64
+}
+
+// listPending returns a sorted list of all pending part IDs with their types.
+func (hnq *handoffNodeQueue) listPending() ([]partTypePair, error) {
+ hnq.mu.RLock()
+ defer hnq.mu.RUnlock()
+
+ entries := hnq.fileSystem.ReadDir(hnq.root)
+ var pairs []partTypePair
+
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+
+ // Skip temp directories and metadata files
+ if filepath.Ext(entry.Name()) == ".tmp" || entry.Name() ==
nodeInfoFilename {
+ continue
+ }
+
+ // Parse partId from hex directory name
+ partID, err := parsePartID(entry.Name())
+ if err != nil {
+ hnq.l.Warn().Str("name", entry.Name()).Msg("invalid
part directory")
+ continue
+ }
+
+ // List part types under this partId
+ partIDDir := filepath.Join(hnq.root, entry.Name())
+ partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+ for _, partTypeEntry := range partTypeEntries {
+ if partTypeEntry.IsDir() {
+ pairs = append(pairs, partTypePair{
+ PartID: partID,
+ PartType: partTypeEntry.Name(),
+ })
+ }
+ }
+ }
+
+ // Sort by partID first, then by partType
+ sort.Slice(pairs, func(i, j int) bool {
+ if pairs[i].PartID == pairs[j].PartID {
+ return pairs[i].PartType < pairs[j].PartType
+ }
+ return pairs[i].PartID < pairs[j].PartID
+ })
+
+ return pairs, nil
+}
+
+// getMetadata reads the handoff metadata for a specific part type.
+func (hnq *handoffNodeQueue) getMetadata(partID uint64, partType string)
(*handoffMetadata, error) {
+ hnq.mu.RLock()
+ defer hnq.mu.RUnlock()
+
+ metaPath := filepath.Join(hnq.getPartTypePath(partID, partType),
handoffMetaFilename)
+ data, err := hnq.fileSystem.Read(metaPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read metadata: %w", err)
+ }
+
+ var meta handoffMetadata
+ if err := json.Unmarshal(data, &meta); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
+ }
+
+ return &meta, nil
+}
+
+// getPartIDDir returns the directory path for a partID (contains all part
types).
+func (hnq *handoffNodeQueue) getPartIDDir(partID uint64) string {
+ return filepath.Join(hnq.root, partName(partID))
+}
+
+// getPartTypePath returns the full path to a specific part type directory.
+func (hnq *handoffNodeQueue) getPartTypePath(partID uint64, partType string)
string {
+ return filepath.Join(hnq.getPartIDDir(partID), partType)
+}
+
+// complete removes a specific part type from the handoff queue after
successful delivery.
+func (hnq *handoffNodeQueue) complete(partID uint64, partType string) error {
+ hnq.mu.Lock()
+ defer hnq.mu.Unlock()
+
+ dstPath := hnq.getPartTypePath(partID, partType)
+
+ // Remove the part type directory
+ hnq.fileSystem.MustRMAll(dstPath)
+
+ // Check if partID directory is now empty, if so remove it too
+ partIDDir := hnq.getPartIDDir(partID)
+ entries := hnq.fileSystem.ReadDir(partIDDir)
+ isEmpty := true
+ for _, entry := range entries {
+ if entry.IsDir() {
+ isEmpty = false
+ break
+ }
+ }
+ if isEmpty {
+ hnq.fileSystem.MustRMAll(partIDDir)
+ }
+
+ hnq.l.Info().
+ Str("node", hnq.nodeAddr).
+ Uint64("partId", partID).
+ Str("partType", partType).
+ Msg("part type removed from handoff queue")
+
+ return nil
+}
+
+// completeAll removes all part types for a given partID.
+func (hnq *handoffNodeQueue) completeAll(partID uint64) error {
+ hnq.mu.Lock()
+ defer hnq.mu.Unlock()
+
+ partIDDir := hnq.getPartIDDir(partID)
+
+ // Simply remove the entire partID directory
+ hnq.fileSystem.MustRMAll(partIDDir)
+
+ hnq.l.Info().
+ Str("node", hnq.nodeAddr).
+ Uint64("partId", partID).
+ Msg("all part types removed from handoff queue")
+
+ return nil
+}
+
+// size returns the total size of all pending parts in bytes.
+func (hnq *handoffNodeQueue) size() (uint64, error) {
+ hnq.mu.RLock()
+ defer hnq.mu.RUnlock()
+
+ partIDEntries := hnq.fileSystem.ReadDir(hnq.root)
+ var totalSize uint64
+
+ for _, partIDEntry := range partIDEntries {
+ if !partIDEntry.IsDir() || partIDEntry.Name() ==
nodeInfoFilename {
+ continue
+ }
+
+ // For each partID directory
+ partIDDir := filepath.Join(hnq.root, partIDEntry.Name())
+ partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+
+ for _, partTypeEntry := range partTypeEntries {
+ if !partTypeEntry.IsDir() {
+ continue
+ }
+
+ // For each part type directory
+ partTypePath := filepath.Join(partIDDir,
partTypeEntry.Name())
+ partFiles := hnq.fileSystem.ReadDir(partTypePath)
+
+ for _, partFile := range partFiles {
+ if partFile.IsDir() {
+ continue
+ }
+
+ filePath := filepath.Join(partTypePath,
partFile.Name())
+ file, err := hnq.fileSystem.OpenFile(filePath)
+ if err != nil {
+ continue // Skip files we can't open
+ }
+ size, _ := file.Size()
+ fs.MustClose(file)
+ if size > 0 {
+ totalSize += uint64(size)
+ }
+ }
+ }
+ }
+
+ return totalSize, nil
+}
+
+// copyPartDirectory creates hardlinks from srcDir to dstDir using
fs.CreateHardLink.
+func copyPartDirectory(fileSystem fs.FileSystem, srcDir, dstDir string) error {
+ if err := fileSystem.CreateHardLink(srcDir, dstDir, nil); err != nil {
+ return fmt.Errorf("failed to create hardlinks from %s to %s:
%w", srcDir, dstDir, err)
+ }
+ return nil
+}
+
+// parsePartID parses a part ID from a hex string directory name.
+func parsePartID(name string) (uint64, error) {
+ partID, err := strconv.ParseUint(name, 16, 64)
+ if err != nil {
+ return 0, fmt.Errorf("cannot parse part ID %s: %w", name, err)
+ }
+ return partID, nil
+}
diff --git a/banyand/trace/handoff_storage_test.go
b/banyand/trace/handoff_storage_test.go
new file mode 100644
index 00000000..93084aff
--- /dev/null
+++ b/banyand/trace/handoff_storage_test.go
@@ -0,0 +1,692 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "encoding/json"
+ "errors"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+const megabyte = 1024 * 1024
+
+type fakeQueueClient struct {
+ healthy []string
+}
+
+func (f *fakeQueueClient) HealthyNodes() []string {
+ return append([]string(nil), f.healthy...)
+}
+
+func (f *fakeQueueClient) NewChunkedSyncClient(string, uint32)
(queue.ChunkedSyncClient, error) {
+ return nil, errors.New("not implemented")
+}
+
+func setupHandoffTest(t *testing.T) (string, fs.FileSystem, *logger.Logger) {
+ tempDir, deferFn := test.Space(require.New(t))
+ t.Cleanup(deferFn)
+
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("handoff-test")
+
+ return tempDir, fileSystem, l
+}
+
+func createTestPart(t *testing.T, fileSystem fs.FileSystem, root string,
partID uint64) string {
+ partPath := filepath.Join(root, partName(partID))
+ fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+ // Create test files
+ files := map[string][]byte{
+ "meta.bin": []byte("test meta data"),
+ "primary.bin": []byte("test primary data"),
+ "spans.bin": []byte("test spans data"),
+ "metadata.json": []byte(`{"id":1,"totalCount":100}`),
+ "tag.type": []byte("test tag type"),
+ "traceID.filter": []byte("test filter"),
+ }
+
+ for filename, content := range files {
+ filePath := filepath.Join(partPath, filename)
+ lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+ require.NoError(t, err)
+ _, err = lf.Write(content)
+ require.NoError(t, err)
+ }
+
+ return partPath
+}
+
+func TestHandoffNodeQueue_EnqueueCore(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1a)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff node queue
+ queueRoot := filepath.Join(tempDir, "handoff", "node1")
+ nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912",
queueRoot, fileSystem, l)
+ require.NoError(t, err)
+
+ // Test enqueue core part
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: "default",
+ ShardID: 0,
+ PartType: PartTypeCore,
+ }
+
+ err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+ require.NoError(t, err)
+
+ // Verify nested structure: <partID>/core/
+ dstPath := nodeQueue.getPartTypePath(partID, PartTypeCore)
+ entries := fileSystem.ReadDir(dstPath)
+ assert.NotEmpty(t, entries)
+
+ // Verify metadata file exists
+ metaPath := filepath.Join(dstPath, handoffMetaFilename)
+ metaData, err := fileSystem.Read(metaPath)
+ require.NoError(t, err)
+ assert.NotEmpty(t, metaData)
+}
+
+func TestHandoffNodeQueue_EnqueueMultiplePartTypes(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source parts
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1b)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff node queue
+ queueRoot := filepath.Join(tempDir, "handoff", "node1")
+ nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912",
queueRoot, fileSystem, l)
+ require.NoError(t, err)
+
+ // Enqueue core part
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: "default",
+ ShardID: 0,
+ PartType: PartTypeCore,
+ }
+ err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+ require.NoError(t, err)
+
+ // Enqueue sidx part (simulated with same source)
+ meta.PartType = "sidx_trace_id"
+ err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+ require.NoError(t, err)
+
+ // Enqueue another sidx part
+ meta.PartType = "sidx_service"
+ err = nodeQueue.enqueue(partID, "sidx_service", sourcePath, meta)
+ require.NoError(t, err)
+
+ // List pending - should have 3 part types for same partID
+ pending, err := nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 3)
+
+ // Verify all belong to same partID
+ for _, pair := range pending {
+ assert.Equal(t, partID, pair.PartID)
+ }
+
+ // Verify part types
+ partTypes := make(map[string]bool)
+ for _, pair := range pending {
+ partTypes[pair.PartType] = true
+ }
+ assert.True(t, partTypes[PartTypeCore])
+ assert.True(t, partTypes["sidx_trace_id"])
+ assert.True(t, partTypes["sidx_service"])
+}
+
+func TestHandoffNodeQueue_Complete(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1c)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff node queue
+ queueRoot := filepath.Join(tempDir, "handoff", "node1")
+ nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912",
queueRoot, fileSystem, l)
+ require.NoError(t, err)
+
+ // Enqueue core and sidx parts
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: "default",
+ ShardID: 0,
+ PartType: PartTypeCore,
+ }
+ err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+ require.NoError(t, err)
+
+ meta.PartType = "sidx_trace_id"
+ err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+ require.NoError(t, err)
+
+ // Verify both are pending
+ pending, err := nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 2)
+
+ // Complete core part only
+ err = nodeQueue.complete(partID, PartTypeCore)
+ require.NoError(t, err)
+
+ // Verify only sidx remains
+ pending, err = nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 1)
+ assert.Equal(t, "sidx_trace_id", pending[0].PartType)
+
+ // Complete remaining part
+ err = nodeQueue.complete(partID, "sidx_trace_id")
+ require.NoError(t, err)
+
+ // Verify nothing pending
+ pending, err = nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 0)
+
+ // Verify partID directory is also removed
+ partIDDir := nodeQueue.getPartIDDir(partID)
+ _, err = os.Stat(partIDDir)
+ assert.True(t, os.IsNotExist(err))
+}
+
+func TestHandoffNodeQueue_CompleteAll(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1d)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff node queue
+ queueRoot := filepath.Join(tempDir, "handoff", "node1")
+ nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912",
queueRoot, fileSystem, l)
+ require.NoError(t, err)
+
+ // Enqueue multiple part types
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: "default",
+ ShardID: 0,
+ PartType: PartTypeCore,
+ }
+ err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+ require.NoError(t, err)
+ err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+ require.NoError(t, err)
+ err = nodeQueue.enqueue(partID, "sidx_service", sourcePath, meta)
+ require.NoError(t, err)
+
+ // Verify all are pending
+ pending, err := nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 3)
+
+ // Complete all at once
+ err = nodeQueue.completeAll(partID)
+ require.NoError(t, err)
+
+ // Verify nothing pending
+ pending, err = nodeQueue.listPending()
+ require.NoError(t, err)
+ assert.Len(t, pending, 0)
+}
+
+func TestHandoffController_EnqueueForNodes(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1e)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff controller
+ offlineNodes := []string{"node1.example.com:17912",
"node2.example.com:17912"}
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
offlineNodes, 0, l, nil)
+ require.NoError(t, err)
+
+ // Enqueue for multiple nodes
+ err = controller.enqueueForNodes(offlineNodes, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+
+ // Verify both nodes have the part
+ for _, nodeAddr := range offlineNodes {
+ pending, err := controller.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, 1)
+ assert.Equal(t, partID, pending[0].PartID)
+ assert.Equal(t, PartTypeCore, pending[0].PartType)
+ }
+}
+
+func TestHandoffController_GetPartPath(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x1f)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ // Create handoff controller
+ nodeAddr := "node1.example.com:17912"
+ controller, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+
+ // Enqueue for node
+ err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+
+ // Get part path
+ partPath := controller.getPartPath(nodeAddr, partID, PartTypeCore)
+ assert.NotEmpty(t, partPath)
+
+ // Verify path exists
+ _, err = os.Stat(partPath)
+ require.NoError(t, err)
+
+ // Verify it's the nested structure
+ assert.Contains(t, partPath, partName(partID))
+ assert.Contains(t, partPath, PartTypeCore)
+}
+
+func TestHandoffController_LoadExistingQueues(t *testing.T) {
+ tempDir, fileSystem, l := setupHandoffTest(t)
+
+ // Create source part
+ sourceRoot := filepath.Join(tempDir, "source")
+ fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+ partID := uint64(0x20)
+ sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+ nodeAddr := "node1.example.com:17912"
+
+ // Create first controller and enqueue part
+ controller1, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+
+ err = controller1.enqueueForNode(nodeAddr, partID, PartTypeCore,
sourcePath, "default", 0)
+ require.NoError(t, err)
+
+ // Close first controller
+ err = controller1.close()
+ require.NoError(t, err)
+
+ // Create second controller (should load existing queues)
+ controller2, err := newHandoffController(fileSystem, tempDir, nil,
[]string{nodeAddr}, 0, l, nil)
+ require.NoError(t, err)
+
+ // Verify part is still pending
+ pending, err := controller2.listPendingForNode(nodeAddr)
+ require.NoError(t, err)
+ assert.Len(t, pending, 1)
+ assert.Equal(t, partID, pending[0].PartID)
+ assert.Equal(t, PartTypeCore, pending[0].PartType)
+}
+
+func TestSanitizeNodeAddr(t *testing.T) {
+ tests := []struct {
+ input string
+ expected string
+ }{
+ {"node1.example.com:17912", "node1.example.com_17912"},
+ {"node2:8080", "node2_8080"},
+ {"192.168.1.1:9999", "192.168.1.1_9999"},
+ {"node/with/slashes", "node_with_slashes"},
+ {"node\\with\\backslashes", "node_with_backslashes"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := sanitizeNodeAddr(tt.input)
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestParsePartID(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ expected uint64
+ expectErr bool
+ }{
+ {"valid hex", "000000000000001a", 0x1a, false},
+ {"valid hex upper", "000000000000001A", 0x1a, false},
+ {"simple hex", "1a", 0x1a, false},
+ {"invalid hex", "xyz", 0, true},
+ {"empty string", "", 0, true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result, err := parsePartID(tt.input)
+ if tt.expectErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expected, result)
+ }
+ })
+ }
+}
+
+// TestHandoffController_SizeEnforcement verifies that handoff queue rejects
enqueues when size limit is exceeded.
+func TestHandoffController_SizeEnforcement(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ // Create a mock part with metadata
+ partID := uint64(100)
+ partPath := filepath.Join(tempDir, "source", partName(partID))
+ err := os.MkdirAll(partPath, 0o755)
+ tester.NoError(err)
+
+ // Create metadata.json with CompressedSizeBytes = 5MB
+ metadata := map[string]interface{}{
+ "compressedSizeBytes": 5 * 1024 * 1024, // 5MB
+ }
+ metadataBytes, err := json.Marshal(metadata)
+ tester.NoError(err)
+ err = os.WriteFile(filepath.Join(partPath, "metadata.json"),
metadataBytes, 0o600)
+ tester.NoError(err)
+
+ // Create a dummy file
+ err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test
data"), 0o600)
+ tester.NoError(err)
+
+ // Create handoff controller with 10MB limit
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ dataNodes := []string{"node1:17912", "node2:17912"}
+ hc, err := newHandoffController(lfs, tempDir, nil, dataNodes,
10*megabyte, l, nil) // 10MB limit
+ tester.NoError(err)
+ defer hc.close()
+
+ // First enqueue should succeed (5MB < 10MB)
+ err = hc.enqueueForNode("node1:17912", partID, PartTypeCore, partPath,
"group1", 1)
+ tester.NoError(err)
+
+ // Check total size
+ totalSize := hc.getTotalSize()
+ tester.Equal(uint64(5*1024*1024), totalSize)
+
+ // Second enqueue should succeed (10MB = 10MB)
+ err = hc.enqueueForNode("node1:17912", partID+1, PartTypeCore,
partPath, "group1", 1)
+ tester.NoError(err)
+
+ // Check total size
+ totalSize = hc.getTotalSize()
+ tester.Equal(uint64(10*1024*1024), totalSize)
+
+ // Third enqueue should fail (15MB > 10MB)
+ err = hc.enqueueForNode("node1:17912", partID+2, PartTypeCore,
partPath, "group1", 1)
+ tester.Error(err)
+ tester.Contains(err.Error(), "handoff queue full")
+
+ // Total size should still be 10MB
+ totalSize = hc.getTotalSize()
+ tester.Equal(uint64(10*1024*1024), totalSize)
+}
+
+// TestHandoffController_SizeTracking verifies that total size is correctly
updated on enqueue and complete.
+func TestHandoffController_SizeTracking(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ // Create a mock part with metadata
+ partID := uint64(200)
+ partPath := filepath.Join(tempDir, "source", partName(partID))
+ err := os.MkdirAll(partPath, 0o755)
+ tester.NoError(err)
+
+ // Create metadata.json with CompressedSizeBytes = 3MB
+ metadata := map[string]interface{}{
+ "compressedSizeBytes": 3 * 1024 * 1024, // 3MB
+ }
+ metadataBytes, err := json.Marshal(metadata)
+ tester.NoError(err)
+ err = os.WriteFile(filepath.Join(partPath, "metadata.json"),
metadataBytes, 0o600)
+ tester.NoError(err)
+
+ // Create a dummy file
+ err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test
data"), 0o600)
+ tester.NoError(err)
+
+ // Create handoff controller with 100MB limit
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ dataNodes := []string{"node1:17912"}
+ hc, err := newHandoffController(lfs, tempDir, nil, dataNodes,
100*megabyte, l, nil) // 100MB limit
+ tester.NoError(err)
+ defer hc.close()
+
+ // Initial size should be 0
+ tester.Equal(uint64(0), hc.getTotalSize())
+
+ // Enqueue first part
+ err = hc.enqueueForNode("node1:17912", partID, PartTypeCore, partPath,
"group1", 1)
+ tester.NoError(err)
+ tester.Equal(uint64(3*1024*1024), hc.getTotalSize())
+
+ // Enqueue second part
+ err = hc.enqueueForNode("node1:17912", partID+1, PartTypeCore,
partPath, "group1", 1)
+ tester.NoError(err)
+ tester.Equal(uint64(6*1024*1024), hc.getTotalSize())
+
+ // Complete first part
+ err = hc.completeSend("node1:17912", partID, PartTypeCore)
+ tester.NoError(err)
+ tester.Equal(uint64(3*1024*1024), hc.getTotalSize())
+
+ // Complete second part
+ err = hc.completeSend("node1:17912", partID+1, PartTypeCore)
+ tester.NoError(err)
+ tester.Equal(uint64(0), hc.getTotalSize())
+}
+
+func TestHandoffController_NodeQueueHelpers(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ const (
+ partID = uint64(400)
+ node = "node1:17912"
+ )
+
+ partPath := filepath.Join(tempDir, "source", partName(partID))
+ err := os.MkdirAll(partPath, 0o755)
+ tester.NoError(err)
+
+ metadata := map[string]interface{}{
+ "compressedSizeBytes": 1024,
+ }
+ metadataBytes, err := json.Marshal(metadata)
+ tester.NoError(err)
+ err = os.WriteFile(filepath.Join(partPath, "metadata.json"),
metadataBytes, 0o600)
+ tester.NoError(err)
+ err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("helper
test"), 0o600)
+ tester.NoError(err)
+
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ hc, err := newHandoffController(lfs, tempDir, nil, []string{node},
10*megabyte, l, nil)
+ tester.NoError(err)
+ defer hc.close()
+
+ err = hc.enqueueForNode(node, partID, PartTypeCore, partPath, "group1",
1)
+ tester.NoError(err)
+
+ nodes := hc.getAllNodeQueues()
+ tester.Contains(nodes, node)
+
+ queueSize, err := hc.getNodeQueueSize(node)
+ tester.NoError(err)
+ tester.Greater(queueSize, uint64(0))
+
+ err = hc.completeSendAll(node, partID)
+ tester.NoError(err)
+
+ queueSize, err = hc.getNodeQueueSize(node)
+ tester.NoError(err)
+ tester.Equal(uint64(0), queueSize)
+}
+
+func TestHandoffController_FiltersNonOwningOfflineNodes(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ dataNodes := []string{"node1:17912", "node2:17912"}
+ const groupName = "group1"
+ const shardID uint32 = 0
+
+ resolver := func(group string, shard uint32) ([]string, error) {
+ tester.Equal(groupName, group)
+ tester.Equal(shardID, shard)
+ return []string{"node2:17912"}, nil
+ }
+
+ queueClient := &fakeQueueClient{healthy: dataNodes}
+ hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes,
0, l, resolver)
+ tester.NoError(err)
+ defer hc.close()
+
+ offline := hc.calculateOfflineNodes([]string{"node2:17912"}, groupName,
common.ShardID(shardID))
+ tester.Len(offline, 0, "expected no offline nodes when shard owner is
online")
+
+ partDir := filepath.Join(tempDir, "part-core")
+ tester.NoError(os.MkdirAll(partDir, 0o755))
+
+ coreParts := []partInfo{{
+ partID: uint64(1),
+ path: partDir,
+ group: groupName,
+ shardID: common.ShardID(shardID),
+ }}
+
+ err = hc.enqueueForOfflineNodes([]string{"node1:17912"}, coreParts, nil)
+ tester.NoError(err)
+
+ nodeDir := filepath.Join(hc.root, sanitizeNodeAddr("node1:17912"))
+ _, statErr := os.Stat(nodeDir)
+ tester.Error(statErr)
+ tester.True(os.IsNotExist(statErr), "expected no queue directory to be
created for non-owning node")
+}
+
+// TestHandoffController_SizeRecovery verifies that total size is correctly
calculated on restart.
+func TestHandoffController_SizeRecovery(t *testing.T) {
+ tester := require.New(t)
+ tempDir, deferFunc := test.Space(tester)
+ defer deferFunc()
+
+ // Create a mock part with metadata
+ partID := uint64(300)
+ partPath := filepath.Join(tempDir, "source", partName(partID))
+ err := os.MkdirAll(partPath, 0o755)
+ tester.NoError(err)
+
+ // Create metadata.json with CompressedSizeBytes = 7MB
+ metadata := map[string]interface{}{
+ "compressedSizeBytes": 7 * 1024 * 1024, // 7MB
+ }
+ metadataBytes, err := json.Marshal(metadata)
+ tester.NoError(err)
+ err = os.WriteFile(filepath.Join(partPath, "metadata.json"),
metadataBytes, 0o600)
+ tester.NoError(err)
+
+ // Create a dummy file
+ err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test
data"), 0o600)
+ tester.NoError(err)
+
+ lfs := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ dataNodes := []string{"node1:17912", "node2:17912"}
+
+ // First controller: enqueue some parts
+ hc1, err := newHandoffController(lfs, tempDir, nil, dataNodes,
100*megabyte, l, nil)
+ tester.NoError(err)
+
+ err = hc1.enqueueForNode("node1:17912", partID, PartTypeCore, partPath,
"group1", 1)
+ tester.NoError(err)
+ err = hc1.enqueueForNode("node2:17912", partID+1, PartTypeCore,
partPath, "group1", 1)
+ tester.NoError(err)
+
+ // Total size should be 14MB (7MB * 2 parts)
+ expectedSize := uint64(14 * 1024 * 1024)
+ tester.Equal(expectedSize, hc1.getTotalSize())
+
+ // Close first controller
+ err = hc1.close()
+ tester.NoError(err)
+
+ // Second controller: should recover the same size
+ hc2, err := newHandoffController(lfs, tempDir, nil, dataNodes,
100*megabyte, l, nil)
+ tester.NoError(err)
+ defer hc2.close()
+
+ // Recovered size should match
+ recoveredSize := hc2.getTotalSize()
+ tester.Equal(expectedSize, recoveredSize)
+
+ // Should have both nodes with pending parts
+ node1Pending, err := hc2.listPendingForNode("node1:17912")
+ tester.NoError(err)
+ tester.Len(node1Pending, 1)
+
+ node2Pending, err := hc2.listPendingForNode("node2:17912")
+ tester.NoError(err)
+ tester.Len(node2Pending, 1)
+}
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 9849d81f..cf435393 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue/pub"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -375,6 +376,7 @@ type queueSupplier struct {
traceDataNodeRegistry grpc.NodeRegistry
l *logger.Logger
schemaRepo *schemaRepo
+ handoffCtrl *handoffController
path string
option option
}
@@ -398,6 +400,7 @@ func newQueueSupplier(path string, svc *liaison,
traceDataNodeRegistry grpc.Node
path: path,
option: opt,
schemaRepo: &svc.schemaRepo,
+ handoffCtrl: svc.handoffCtrl,
}
}
@@ -431,7 +434,11 @@ func (qs *queueSupplier) OpenDB(groupSchema
*commonv1.Group) (resourceSchema.DB,
Location: path.Join(qs.path, group),
Option: qs.option,
Metrics: qs.newMetrics(p),
- SubQueueCreator: newWriteQueue,
+ SubQueueCreator: func(fileSystem fs.FileSystem, root string,
position common.Position,
+ l *logger.Logger, option option, metrics any, group
string, shardID common.ShardID, getNodes func() []string,
+ ) (*tsTable, error) {
+ return newWriteQueue(fileSystem, root, position, l,
option, metrics, group, shardID, getNodes, qs.handoffCtrl)
+ },
GetNodes: func(shardID common.ShardID) []string {
copies := ro.Replicas + 1
nodeSet := make(map[string]struct{}, copies)
diff --git a/banyand/trace/streaming_pipeline_test.go
b/banyand/trace/streaming_pipeline_test.go
index f361ded5..a36ae9e5 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -79,7 +79,8 @@ func (f *fakeSIDX) Merge(<-chan struct{},
map[uint64]struct{}, uint64) (*sidx.Me
func (f *fakeSIDX) StreamingParts(map[uint64]struct{}, string, uint32, string)
([]queue.StreamingPartData, []func()) {
panic("not implemented")
}
-func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return func()
{} }
+func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return
map[uint64]string{} }
+func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return
func() {} }
type fakeSIDXWithErr struct {
*fakeSIDX
@@ -647,6 +648,10 @@ func (f *fakeSIDXInfinite) Merge(<-chan struct{},
map[uint64]struct{}, uint64) (
func (f *fakeSIDXInfinite) StreamingParts(map[uint64]struct{}, string, uint32,
string) ([]queue.StreamingPartData, []func()) {
panic("not implemented")
}
+
+func (f *fakeSIDXInfinite) PartPaths(map[uint64]struct{}) map[uint64]string {
+ return map[uint64]string{}
+}
func (f *fakeSIDXInfinite) IntroduceSynced(map[uint64]struct{}) func() {
return func() {} }
// TestStreamSIDXTraceBatches_InfiniteChannelContinuesUntilCanceled verifies
that
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index b503db77..0a5de970 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -20,8 +20,10 @@ package trace
import (
"context"
"errors"
+ "fmt"
"path"
"path/filepath"
+ "sort"
"time"
"github.com/apache/skywalking-banyandb/api/common"
@@ -44,19 +46,22 @@ import (
)
type liaison struct {
- pm protector.Memory
- metadata metadata.Repo
- pipeline queue.Server
- omr observability.MetricsRegistry
- lfs fs.FileSystem
- writeListener bus.MessageListener
- dataNodeSelector node.Selector
- l *logger.Logger
- schemaRepo schemaRepo
- dataPath string
- root string
- option option
- maxDiskUsagePercent int
+ metadata metadata.Repo
+ pipeline queue.Server
+ omr observability.MetricsRegistry
+ lfs fs.FileSystem
+ writeListener bus.MessageListener
+ dataNodeSelector node.Selector
+ pm protector.Memory
+ handoffCtrl *handoffController
+ l *logger.Logger
+ schemaRepo schemaRepo
+ dataPath string
+ root string
+ dataNodeList []string
+ option option
+ maxDiskUsagePercent int
+ handoffMaxSizePercent int
}
var _ Service = (*liaison)(nil)
@@ -89,6 +94,12 @@ func (l *liaison) FlagSet() *run.FlagSet {
fs.DurationVar(&l.option.flushTimeout, "trace-flush-timeout",
3*time.Second, "the timeout for trace data flush")
fs.IntVar(&l.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95,
"the maximum disk usage percentage")
fs.DurationVar(&l.option.syncInterval, "trace-sync-interval",
defaultSyncInterval, "the periodic sync interval for trace data")
+ fs.StringSliceVar(&l.dataNodeList, "data-node-list", nil,
"comma-separated list of data node names to monitor for handoff")
+ fs.IntVar(&l.handoffMaxSizePercent, "handoff-max-size-percent", 10,
+ "percentage of BanyanDB's allowed disk usage allocated to
handoff storage. "+
+ "Calculated as: totalDisk *
trace-max-disk-usage-percent * handoff-max-size-percent / 10000. "+
+ "Example: 100GB disk with 95% max usage and 10% handoff
= 9.5GB; 50% handoff = 47.5GB. "+
+ "Valid range: 0-100")
return fs
}
@@ -96,6 +107,17 @@ func (l *liaison) Validate() error {
if l.root == "" {
return errEmptyRootPath
}
+
+ // Validate handoff-max-size-percent is within valid range
+ // Since handoffMaxSizePercent represents the percentage of BanyanDB's
allowed disk usage
+ // that goes to handoff storage, it should be between 0-100%
+ if l.handoffMaxSizePercent < 0 || l.handoffMaxSizePercent > 100 {
+ return fmt.Errorf("invalid handoff-max-size-percent: %d%%. Must
be between 0 and 100. "+
+ "This represents what percentage of BanyanDB's allowed
disk usage is allocated to handoff storage. "+
+ "Example: 100GB disk with 95%% max usage and 50%%
handoff = 100 * 95%% * 50%% = 47.5GB for handoff",
+ l.handoffMaxSizePercent)
+ }
+
return nil
}
@@ -122,6 +144,69 @@ func (l *liaison) PreRun(ctx context.Context) error {
}
traceDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client,
l.dataNodeSelector)
+ // Initialize handoff controller if data nodes are configured
+ l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent",
l.handoffMaxSizePercent).
+ Msg("handoff configuration")
+ if len(l.dataNodeList) > 0 && l.option.tire2Client != nil {
+ // Calculate max handoff size based on percentage of disk space
+ // Formula: totalDisk * maxDiskUsagePercent *
handoffMaxSizePercent / 10000
+ // Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 *
10 / 10000 = 9.5GB
+ maxSize := 0
+ if l.handoffMaxSizePercent > 0 {
+ l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
+ totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+ // Divide after each multiplication to avoid overflow
with large disk capacities
+ maxSizeBytes := totalSpace *
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
+ maxSize = int(maxSizeBytes / 1024 / 1024)
+ }
+
+ // nolint:contextcheck
+ resolveAssignments := func(group string, shardID uint32)
([]string, error) {
+ if l.metadata == nil {
+ return nil, fmt.Errorf("metadata repo is not
initialized")
+ }
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ groupSchema, err :=
l.metadata.GroupRegistry().GetGroup(ctx, group)
+ if err != nil {
+ return nil, err
+ }
+ if groupSchema == nil || groupSchema.ResourceOpts ==
nil {
+ return nil, fmt.Errorf("group %s missing
resource options", group)
+ }
+ copies := groupSchema.ResourceOpts.Replicas + 1
+ if len(l.dataNodeList) == 0 {
+ return nil, fmt.Errorf("no data nodes
configured for handoff")
+ }
+ sortedNodes := append([]string(nil), l.dataNodeList...)
+ sort.Strings(sortedNodes)
+ nodes := make([]string, 0, copies)
+ seen := make(map[string]struct{}, copies)
+ for replica := uint32(0); replica < copies; replica++ {
+ nodeID :=
sortedNodes[(int(shardID)+int(replica))%len(sortedNodes)]
+ if _, ok := seen[nodeID]; ok {
+ continue
+ }
+ nodes = append(nodes, nodeID)
+ seen[nodeID] = struct{}{}
+ }
+ return nodes, nil
+ }
+
+ var err error
+ // nolint:contextcheck
+ l.handoffCtrl, err = newHandoffController(l.lfs, l.dataPath,
l.option.tire2Client, l.dataNodeList, maxSize, l.l, resolveAssignments)
+ if err != nil {
+ return err
+ }
+ l.l.Info().
+ Int("dataNodes", len(l.dataNodeList)).
+ Int("maxSize", maxSize).
+ Int("maxSizePercent", l.handoffMaxSizePercent).
+ Int("diskUsagePercent", l.maxDiskUsagePercent).
+ Msg("handoff controller initialized")
+ }
+
l.schemaRepo = newLiaisonSchemaRepo(l.dataPath, l,
traceDataNodeRegistry)
l.writeListener = setUpWriteQueueCallback(l.l, &l.schemaRepo,
l.maxDiskUsagePercent, l.option.tire2Client)
@@ -140,6 +225,11 @@ func (l *liaison) Serve() run.StopNotify {
}
func (l *liaison) GracefulStop() {
+ if l.handoffCtrl != nil {
+ if err := l.handoffCtrl.close(); err != nil {
+ l.l.Warn().Err(err).Msg("failed to close handoff
controller")
+ }
+ }
if l.schemaRepo.Repository != nil {
l.schemaRepo.Repository.Close()
}
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8dd8fea7..02d3ded7 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -294,6 +294,10 @@ func (tst *tsTable) executeSyncOperation(partsToSync
[]*part, partIDsToSync map[
return err
}
}
+
+ // After sync attempts, enqueue parts for offline nodes
+ tst.enqueueForOfflineNodes(nodes, partsToSync, partIDsToSync)
+
return nil
}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 4a9645eb..2d83dc25 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -56,6 +56,7 @@ type tsTable struct {
snapshot *snapshot
loopCloser *run.Closer
getNodes func() []string
+ handoffCtrl *handoffController
option option
introductions chan *introduction
p common.Position
@@ -337,6 +338,62 @@ func (tst *tsTable) getAllSidx() map[string]sidx.SIDX {
return result
}
+// enqueueForOfflineNodes enqueues parts for offline nodes via the handoff
controller.
+func (tst *tsTable) enqueueForOfflineNodes(onlineNodes []string, partsToSync
[]*part, partIDsToSync map[uint64]struct{}) {
+ if tst.handoffCtrl == nil {
+ return
+ }
+
+ // Check if there are any offline nodes before doing expensive
preparation work
+ offlineNodes := tst.handoffCtrl.calculateOfflineNodes(onlineNodes,
tst.group, tst.shardID)
+ tst.l.Debug().
+ Str("group", tst.group).
+ Uint32("shardID", uint32(tst.shardID)).
+ Strs("onlineNodes", onlineNodes).
+ Strs("offlineNodes", offlineNodes).
+ Msg("handoff enqueue evaluation")
+ if len(offlineNodes) == 0 {
+ return
+ }
+
+ // Prepare core parts info
+ coreParts := make([]partInfo, 0, len(partsToSync))
+ for _, part := range partsToSync {
+ coreParts = append(coreParts, partInfo{
+ partID: part.partMetadata.ID,
+ path: part.path,
+ group: tst.group,
+ shardID: tst.shardID,
+ })
+ }
+
+ // Get sidx part paths from each sidx instance
+ sidxMap := tst.getAllSidx()
+ sidxParts := make(map[string][]partInfo)
+ for sidxName, sidxInstance := range sidxMap {
+ partPaths := sidxInstance.PartPaths(partIDsToSync)
+ if len(partPaths) == 0 {
+ continue
+ }
+
+ parts := make([]partInfo, 0, len(partPaths))
+ for partID, path := range partPaths {
+ parts = append(parts, partInfo{
+ partID: partID,
+ path: path,
+ group: tst.group,
+ shardID: tst.shardID,
+ })
+ }
+ sidxParts[sidxName] = parts
+ }
+
+ // Call handoff controller with offline nodes
+ if err := tst.handoffCtrl.enqueueForOfflineNodes(offlineNodes,
coreParts, sidxParts); err != nil {
+ tst.l.Warn().Err(err).Msg("handoff enqueue completed with
errors")
+ }
+}
+
func (tst *tsTable) loadSidxMap(availablePartIDs []uint64) {
tst.sidxMap = make(map[string]sidx.SIDX)
sidxRootPath := filepath.Join(tst.root, sidxDirName)
diff --git a/banyand/trace/wqueue.go b/banyand/trace/wqueue.go
index ef5584dd..bbc5bcfe 100644
--- a/banyand/trace/wqueue.go
+++ b/banyand/trace/wqueue.go
@@ -34,11 +34,13 @@ type NodeSelector interface {
// newWriteQueue is like newTSTable but does not start the merge loop (or any
background loops).
func newWriteQueue(fileSystem fs.FileSystem, rootPath string, p
common.Position,
l *logger.Logger, option option, m any, group string, shardID
common.ShardID, getNodes func() []string,
+ handoffCtrl *handoffController,
) (*tsTable, error) {
t, epoch := initTSTable(fileSystem, rootPath, p, l, option, m)
t.getNodes = getNodes
t.group = group
t.shardID = shardID
+ t.handoffCtrl = handoffCtrl
t.startLoopWithConditionalMerge(epoch)
return t, nil
}
diff --git a/banyand/trace/wqueue_test.go b/banyand/trace/wqueue_test.go
index 02c20d2a..b93f612e 100644
--- a/banyand/trace/wqueue_test.go
+++ b/banyand/trace/wqueue_test.go
@@ -74,6 +74,7 @@ func Test_newWriteQueue(t *testing.T) {
"test-group",
common.ShardID(1),
func() []string { return []string{"node1", "node2"} },
+ nil, // handoffCtrl
)
require.NoError(t, err)
require.NotNil(t, tst)
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 154c7943..57d46083 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -120,6 +120,8 @@ type FileSystem interface {
SyncPath(path string)
// MustGetFreeSpace returns the free space of the file system.
MustGetFreeSpace(path string) uint64
+ // MustGetTotalSpace returns the total space of the file system.
+ MustGetTotalSpace(path string) uint64
// CreateHardLink creates hard links in destPath for files in srcPath
that pass the filter.
CreateHardLink(srcPath, destPath string, filter func(string) bool) error
}
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 49f24f7b..451f46c1 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -301,6 +301,14 @@ func (fs *localFileSystem) MustGetFreeSpace(path string)
uint64 {
return usage.Free
}
+func (fs *localFileSystem) MustGetTotalSpace(path string) uint64 {
+ usage, err := disk.Usage(path)
+ if err != nil {
+ fs.logger.Panic().Str("path", path).Err(err).Msg("failed to get
disk usage")
+ }
+ return usage.Total
+}
+
func (fs *localFileSystem) CreateHardLink(srcPath, destPath string, filter
func(string) bool) error {
fi, err := os.Stat(srcPath)
if err != nil {
diff --git a/test/integration/handoff/handoff_suite_test.go
b/test/integration/handoff/handoff_suite_test.go
new file mode 100644
index 00000000..988720ca
--- /dev/null
+++ b/test/integration/handoff/handoff_suite_test.go
@@ -0,0 +1,584 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handoff_test
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "hash/fnv"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+const (
+ nodeHost = "127.0.0.1"
+ grpcHost = "127.0.0.1"
+ traceShardCount = 2
+)
+
+type dataNodeHandle struct {
+ closeFn func()
+ dataDir string
+ addr string
+ grpcPort int
+ gossipPort int
+}
+
+func newDataNodeHandle(dataDir string, grpcPort, gossipPort int)
*dataNodeHandle {
+ return &dataNodeHandle{
+ dataDir: dataDir,
+ grpcPort: grpcPort,
+ gossipPort: gossipPort,
+ addr: fmt.Sprintf("%s:%d", nodeHost, grpcPort),
+ }
+}
+
+func (h *dataNodeHandle) start(etcdEndpoint string) {
+ Expect(h.closeFn).To(BeNil())
+ args := []string{
+ "data",
+ "--grpc-host=" + grpcHost,
+ fmt.Sprintf("--grpc-port=%d", h.grpcPort),
+ fmt.Sprintf("--property-repair-gossip-grpc-port=%d",
h.gossipPort),
+ "--stream-root-path=" + h.dataDir,
+ "--measure-root-path=" + h.dataDir,
+ "--property-root-path=" + h.dataDir,
+ "--trace-root-path=" + h.dataDir,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost,
+ "--node-labels", "type=handoff",
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
+ "--measure-flush-timeout=0s",
+ "--stream-flush-timeout=0s",
+ "--trace-flush-timeout=0s",
+ }
+ h.closeFn = setup.CMD(args...)
+
+ Eventually(helpers.HealthCheck(fmt.Sprintf("%s:%d", grpcHost,
h.grpcPort), 10*time.Second, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials())),
flags.EventuallyTimeout).Should(Succeed())
+
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (h *dataNodeHandle) stop(etcdEndpoint string) {
+ if h.closeFn != nil {
+ h.closeFn()
+ h.closeFn = nil
+ }
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (h *dataNodeHandle) etcdKey() string {
+ return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace,
nodeHost, h.grpcPort)
+}
+
+type liaisonHandle struct {
+ closeFn func()
+ rootPath string
+ addr string
+ httpAddr string
+ grpcPort int
+ httpPort int
+ serverPort int
+}
+
+func newLiaisonHandle(rootPath string, grpcPort, httpPort, serverPort int)
*liaisonHandle {
+ return &liaisonHandle{
+ rootPath: rootPath,
+ grpcPort: grpcPort,
+ httpPort: httpPort,
+ serverPort: serverPort,
+ addr: fmt.Sprintf("%s:%d", grpcHost, grpcPort),
+ httpAddr: fmt.Sprintf("%s:%d", grpcHost, httpPort),
+ }
+}
+
+func (l *liaisonHandle) start(etcdEndpoint string, dataNodes []string) {
+ Expect(l.closeFn).To(BeNil())
+ joined := strings.Join(dataNodes, ",")
+ Expect(os.Setenv("BYDB_DATA_NODE_LIST", joined)).To(Succeed())
+ Expect(os.Setenv("BYDB_DATA_NODE_SELECTOR",
"type=handoff")).To(Succeed())
+ args := []string{
+ "liaison",
+ "--grpc-host=" + grpcHost,
+ fmt.Sprintf("--grpc-port=%d", l.grpcPort),
+ "--http-host=" + grpcHost,
+ fmt.Sprintf("--http-port=%d", l.httpPort),
+ "--liaison-server-grpc-host=" + grpcHost,
+ fmt.Sprintf("--liaison-server-grpc-port=%d", l.serverPort),
+ "--http-grpc-addr=" + l.addr,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost,
+ "--stream-root-path=" + l.rootPath,
+ "--measure-root-path=" + l.rootPath,
+ "--trace-root-path=" + l.rootPath,
+ "--stream-flush-timeout=500ms",
+ "--measure-flush-timeout=500ms",
+ "--trace-flush-timeout=500ms",
+ "--stream-sync-interval=1s",
+ "--measure-sync-interval=1s",
+ "--trace-sync-interval=1s",
+ "--handoff-max-size-percent=100",
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
+ }
+
+ l.closeFn = setup.CMD(args...)
+
+ Eventually(helpers.HTTPHealthCheck(l.httpAddr, ""),
flags.EventuallyTimeout).Should(Succeed())
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (l *liaisonHandle) stop(etcdEndpoint string) {
+ if l.closeFn != nil {
+ l.closeFn()
+ l.closeFn = nil
+ }
+ _ = os.Unsetenv("BYDB_DATA_NODE_LIST")
+ _ = os.Unsetenv("BYDB_DATA_NODE_SELECTOR")
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (l *liaisonHandle) etcdKey() string {
+ return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace,
nodeHost, l.serverPort)
+}
+
+type suiteInfo struct {
+ LiaisonAddr string `json:"liaison_addr"`
+ EtcdEndpoint string `json:"etcd_endpoint"`
+ HandoffRoot string `json:"handoff_root"`
+ DataNodes []string `json:"data_nodes"`
+}
+
+func keysToSet[K comparable, V any](m map[K]V) map[K]struct{} {
+ if m == nil {
+ return nil
+ }
+ out := make(map[K]struct{}, len(m))
+ for k := range m {
+ out[k] = struct{}{}
+ }
+ return out
+}
+
+var (
+ connection *grpc.ClientConn
+ goods []gleak.Goroutine
+ cleanupFunc func()
+ handoffRoot string
+ etcdEndpoint string
+
+ dnHandles [2]*dataNodeHandle
+ liaison *liaisonHandle
+
+ etcdServer embeddedetcd.Server
+ etcdSpaceDef func()
+ liaisonDef func()
+ dataDefs []func()
+)
+
+func TestHandoff(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Trace Handoff Suite")
+}
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{Env: "dev", Level:
flags.LogLevel})).To(Succeed())
+ goods = gleak.Goroutines()
+
+ etcdPorts, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+
+ var etcdDir string
+ etcdDir, etcdSpaceDef, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+
+ clientEP := fmt.Sprintf("http://%s:%d", nodeHost, etcdPorts[0])
+ peerEP := fmt.Sprintf("http://%s:%d", nodeHost, etcdPorts[1])
+
+ etcdServer, err = embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{clientEP},
[]string{peerEP}),
+ embeddedetcd.RootDir(etcdDir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ <-etcdServer.ReadyNotify()
+ etcdEndpoint = clientEP
+
+ registry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{clientEP}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer registry.Close()
+ Expect(testtrace.PreloadSchema(context.Background(),
registry)).To(Succeed())
+
+ dataDefs = make([]func(), 0, 2)
+ for i := range dnHandles {
+ dataDir, def, errDir := test.NewSpace()
+ Expect(errDir).NotTo(HaveOccurred())
+ dataDefs = append(dataDefs, def)
+ ports, errPorts := test.AllocateFreePorts(2)
+ Expect(errPorts).NotTo(HaveOccurred())
+ dnHandles[i] = newDataNodeHandle(dataDir, ports[0], ports[1])
+ dnHandles[i].start(etcdEndpoint)
+ }
+
+ liaisonPath, def, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ liaisonDef = def
+ liaisonPorts, err := test.AllocateFreePorts(3)
+ Expect(err).NotTo(HaveOccurred())
+ liaison = newLiaisonHandle(liaisonPath, liaisonPorts[0],
liaisonPorts[1], liaisonPorts[2])
+ nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+ liaison.start(etcdEndpoint, nodeAddrs)
+
+ handoffRoot = filepath.Join(liaisonPath, "trace", "data", "handoff",
"nodes")
+
+ cleanupFunc = func() {
+ if liaison != nil {
+ liaison.stop(etcdEndpoint)
+ }
+ for i := range dnHandles {
+ if dnHandles[i] != nil {
+ dnHandles[i].stop(etcdEndpoint)
+ }
+ }
+ if etcdServer != nil {
+ _ = etcdServer.Close()
+ }
+ if liaisonDef != nil {
+ liaisonDef()
+ }
+ for _, def := range dataDefs {
+ if def != nil {
+ def()
+ }
+ }
+ if etcdSpaceDef != nil {
+ etcdSpaceDef()
+ }
+ }
+
+ info := suiteInfo{
+ LiaisonAddr: liaison.addr,
+ EtcdEndpoint: etcdEndpoint,
+ HandoffRoot: handoffRoot,
+ DataNodes: nodeAddrs,
+ }
+ payload, err := json.Marshal(info)
+ Expect(err).NotTo(HaveOccurred())
+ return payload
+}, func(data []byte) {
+ var info suiteInfo
+ Expect(json.Unmarshal(data, &info)).To(Succeed())
+ etcdEndpoint = info.EtcdEndpoint
+ handoffRoot = info.HandoffRoot
+ if liaison == nil {
+ liaison = &liaisonHandle{addr: info.LiaisonAddr}
+ } else {
+ liaison.addr = info.LiaisonAddr
+ }
+
+ var err error
+ connection, err = grpchelper.Conn(info.LiaisonAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Trace Handoff Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if cleanupFunc != nil {
+ cleanupFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ }
+})
+
+var _ = Describe("trace handoff", func() {
+ It("replays data to recovered nodes and empties the queue", func() {
+ Expect(connection).NotTo(BeNil())
+
+ nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+ var (
+ targetIndex = -1
+ traceID string
+ writeTime time.Time
+ targetShard uint32
+ )
+
+ for idx := range dnHandles {
+ candidateShard := shardIDForNode(dnHandles[idx].addr,
nodeAddrs)
+ candidateTraceID :=
generateTraceIDForShard(candidateShard, traceShardCount)
+
+ By(fmt.Sprintf("ensuring the handoff queue for %s
starts empty", dnHandles[idx].addr))
+
Expect(countPendingParts(dnHandles[idx].addr)).To(Equal(0))
+
+ By(fmt.Sprintf("stopping %s for shard %d",
dnHandles[idx].addr, candidateShard))
+ dnHandles[idx].stop(etcdEndpoint)
+ time.Sleep(7 * time.Second)
+
+ candidateWriteTime := writeTrace(connection,
candidateTraceID)
+ found := waitForPendingParts(dnHandles[idx].addr,
flags.EventuallyTimeout)
+ if found {
+ targetIndex = idx
+ traceID = candidateTraceID
+ writeTime = candidateWriteTime
+ targetShard = candidateShard
+ break
+ }
+
+ By(fmt.Sprintf("no handoff data detected for %s,
bringing node back online", dnHandles[idx].addr))
+ dnHandles[idx].start(etcdEndpoint)
+ Eventually(func() int {
+ return countPendingParts(dnHandles[idx].addr)
+ }, flags.EventuallyTimeout).Should(Equal(0))
+ }
+
+ Expect(targetIndex).NotTo(Equal(-1), "expected to identify a
shard owner for handoff")
+ targetAddr := dnHandles[targetIndex].addr
+
+ By("restarting the offline node after queueing data")
+ dnHandles[targetIndex].start(etcdEndpoint)
+
+ By("waiting for the queue to drain")
+ Eventually(func() int {
+ return countPendingParts(targetAddr)
+ }, flags.EventuallyTimeout).Should(Equal(0))
+
+ By("verifying the replayed trace can be queried")
+ Eventually(func() error {
+ return queryTrace(connection, traceID, writeTime)
+ }, flags.EventuallyTimeout).Should(Succeed())
+
+ var otherIndex int
+ for idx := range dnHandles {
+ if idx != targetIndex {
+ otherIndex = idx
+ break
+ }
+ }
+
+ By("stopping the other node to ensure the trace resides on the
recovered node")
+ dnHandles[otherIndex].stop(etcdEndpoint)
+ defer dnHandles[otherIndex].start(etcdEndpoint)
+
+ Eventually(func() error {
+ return queryTrace(connection, traceID, writeTime)
+ }, flags.EventuallyTimeout).Should(Succeed())
+
+ By(fmt.Sprintf("verified handoff for shard %d via node %s",
targetShard, targetAddr))
+ })
+})
+
+func writeTrace(conn *grpc.ClientConn, traceID string) time.Time {
+ client := tracev1.NewTraceServiceClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ stream, err := client.Write(ctx)
+ Expect(err).NotTo(HaveOccurred())
+
+ baseTime := time.Now().UTC().Truncate(time.Millisecond)
+
+ tags := []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
traceID}}},
+ {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 1}}},
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"handoff_service"}}},
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"handoff_instance"}}},
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"/handoff"}}},
+ {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 321}}},
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "span-"
+ traceID}}},
+ {Value: &modelv1.TagValue_Timestamp{Timestamp:
timestamppb.New(baseTime)}},
+ }
+
+ err = stream.Send(&tracev1.WriteRequest{
+ Metadata: &commonv1.Metadata{Name: "sw", Group:
"test-trace-group"},
+ Tags: tags,
+ Span: []byte("span-" + traceID),
+ Version: 1,
+ })
+ Expect(err).NotTo(HaveOccurred())
+ Expect(stream.CloseSend()).To(Succeed())
+
+ Eventually(func() error {
+ _, err = stream.Recv()
+ return err
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+
+ return baseTime
+}
+
+func queryTrace(conn *grpc.ClientConn, traceID string, ts time.Time) error {
+ client := tracev1.NewTraceServiceClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ req := &tracev1.QueryRequest{
+ Groups: []string{"test-trace-group"},
+ Name: "sw",
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(ts.Add(-5 * time.Minute)),
+ End: timestamppb.New(ts.Add(5 * time.Minute)),
+ },
+ Criteria: &modelv1.Criteria{
+ Exp: &modelv1.Criteria_Condition{
+ Condition: &modelv1.Condition{
+ Name: "trace_id",
+ Op: modelv1.Condition_BINARY_OP_EQ,
+ Value: &modelv1.TagValue{Value:
&modelv1.TagValue_Str{
+ Str: &modelv1.Str{Value:
traceID},
+ }},
+ },
+ },
+ },
+ TagProjection: []string{"trace_id"},
+ }
+
+ resp, err := client.Query(ctx, req)
+ if err != nil {
+ return err
+ }
+ if len(resp.GetTraces()) == 0 {
+ return fmt.Errorf("trace %s not found", traceID)
+ }
+ return nil
+}
+
+func countPendingParts(nodeAddr string) int {
+ sanitized := sanitizeNodeAddr(nodeAddr)
+ nodeDir := filepath.Join(handoffRoot, sanitized)
+
+ entries, err := os.ReadDir(nodeDir)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ return 0
+ }
+ Fail(fmt.Sprintf("failed to read handoff directory %s: %v",
nodeDir, err))
+ }
+
+ count := 0
+ for _, entry := range entries {
+ if entry.IsDir() {
+ count++
+ }
+ }
+ return count
+}
+
+func sanitizeNodeAddr(addr string) string {
+ replacer := strings.NewReplacer(":", "_", "/", "_", "\\", "_")
+ return replacer.Replace(addr)
+}
+
+func computeShardForTraceID(traceID string, shardCount uint32) uint32 {
+ hasher := fnv.New32a()
+ _, _ = hasher.Write([]byte(traceID))
+ return hasher.Sum32() % shardCount
+}
+
+func generateTraceIDForShard(shardID uint32, shardCount uint32) string {
+ for i := 0; i < 10000; i++ {
+ candidate := fmt.Sprintf("handoff-trace-%d-%d", shardID, i)
+ if computeShardForTraceID(candidate, shardCount) == shardID {
+ return candidate
+ }
+ }
+ Fail(fmt.Sprintf("failed to generate trace ID for shard %d", shardID))
+ return ""
+}
+
+func shardIDForNode(nodeAddr string, nodeAddrs []string) uint32 {
+ if len(nodeAddrs) == 0 {
+ Fail("no data nodes available to determine shard assignment")
+ return 0
+ }
+ sorted := append([]string(nil), nodeAddrs...)
+ sort.Strings(sorted)
+ for idx, addr := range sorted {
+ if addr == nodeAddr {
+ return uint32(idx % len(sorted))
+ }
+ }
+ Fail(fmt.Sprintf("node %s not found in data node list %v", nodeAddr,
nodeAddrs))
+ return 0
+}
+
+func waitForPendingParts(nodeAddr string, timeout time.Duration) bool {
+ deadline := time.Now().Add(timeout)
+ for {
+ if countPendingParts(nodeAddr) > 0 {
+ return true
+ }
+ if time.Now().After(deadline) {
+ return false
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+}