This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 38f19094 Implement Handoff Queue for Trace (#832)
38f19094 is described below

commit 38f19094145a30c6403fbc4390d0b0cf57494725
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Nov 3 20:53:57 2025 +0800

    Implement Handoff Queue for Trace (#832)
---
 CHANGES.md                                     |    1 +
 banyand/internal/sidx/interfaces.go            |    3 +
 banyand/internal/sidx/sidx.go                  |   51 +
 banyand/internal/sidx/sidx_test.go             |   43 +
 banyand/queue/local.go                         |    4 +
 banyand/queue/pub/pub.go                       |   12 +
 banyand/queue/queue.go                         |    1 +
 banyand/trace/handoff_controller.go            | 1261 ++++++++++++++++++++++++
 banyand/trace/handoff_replay_test.go           |  406 ++++++++
 banyand/trace/handoff_storage.go               |  373 +++++++
 banyand/trace/handoff_storage_test.go          |  692 +++++++++++++
 banyand/trace/metadata.go                      |    9 +-
 banyand/trace/streaming_pipeline_test.go       |    7 +-
 banyand/trace/svc_liaison.go                   |  116 ++-
 banyand/trace/syncer.go                        |    4 +
 banyand/trace/tstable.go                       |   57 ++
 banyand/trace/wqueue.go                        |    2 +
 banyand/trace/wqueue_test.go                   |    1 +
 pkg/fs/file_system.go                          |    2 +
 pkg/fs/local_file_system.go                    |    8 +
 test/integration/handoff/handoff_suite_test.go |  584 +++++++++++
 21 files changed, 3622 insertions(+), 15 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 73fb4557..0b0e8b89 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -53,6 +53,7 @@ Release Notes.
 - Implement Trace Tree for debug mode.
 - Implement bydbQL.
 - UI: Implement the Query Page for BydbQL.
+- Implement the handoff queue for Trace.
 
 ### Bug Fixes
 
diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index 5d80d6e2..f1570e53 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -58,6 +58,9 @@ type SIDX interface {
        Merge(closeCh <-chan struct{}, partIDstoMerge map[uint64]struct{}, 
newPartID uint64) (*MergerIntroduction, error)
        // StreamingParts returns the streaming parts.
        StreamingParts(partIDsToSync map[uint64]struct{}, group string, shardID 
uint32, name string) ([]queue.StreamingPartData, []func())
+       // PartPaths returns filesystem paths for the requested partIDs keyed 
by partID.
+       // Missing partIDs are omitted from the returned map.
+       PartPaths(partIDs map[uint64]struct{}) map[uint64]string
        // IntroduceSynced introduces a synced map to the SIDX instance.
        IntroduceSynced(partIDsToSync map[uint64]struct{}) func()
 }
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index e48a7f32..f2cc93e9 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -159,6 +159,57 @@ func (s *sidx) Flush(partIDsToFlush map[uint64]struct{}) 
(*FlusherIntroduction,
        return flushIntro, nil
 }
 
+// PartPaths implements SIDX interface and returns on-disk paths for the 
requested part IDs.
+func (s *sidx) PartPaths(partIDs map[uint64]struct{}) map[uint64]string {
+       if len(partIDs) == 0 {
+               return map[uint64]string{}
+       }
+
+       snap := s.currentSnapshot()
+       if snap == nil {
+               return map[uint64]string{}
+       }
+       defer snap.decRef()
+
+       result := make(map[uint64]string, len(partIDs))
+
+       for _, pw := range snap.parts {
+               var (
+                       id   uint64
+                       path string
+               )
+
+               switch {
+               case pw.p != nil && pw.p.partMetadata != nil:
+                       id = pw.p.partMetadata.ID
+                       path = pw.p.path
+               case pw.mp != nil && pw.mp.partMetadata != nil:
+                       id = pw.mp.partMetadata.ID
+               default:
+                       continue
+               }
+
+               if _, ok := partIDs[id]; !ok {
+                       continue
+               }
+               // Skip mem parts since they do not have stable on-disk paths 
yet.
+               if pw.isMemPart() || pw.p == nil {
+                       continue
+               }
+               if path == "" {
+                       path = partPath(s.root, id)
+               }
+               result[id] = path
+
+               // All requested IDs found.
+               if len(result) == len(partIDs) {
+                       break
+               }
+       }
+
+       return result
+}
+
 // Close implements SIDX interface.
 func (s *sidx) Close() error {
        // Close current snapshot
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index aad90147..ebd9a0c8 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -214,6 +214,49 @@ func TestSIDX_Write_WithTags(t *testing.T) {
        writeTestData(t, sidx, reqs, 3, 3) // Test with segmentID=3, partID=3
 }
 
+func TestSIDX_PartPaths(t *testing.T) {
+       sidxIface := createTestSIDX(t)
+       raw := sidxIface.(*sidx)
+       defer func() {
+               assert.NoError(t, raw.Close())
+       }()
+
+       const (
+               flushedID = uint64(101)
+               memOnlyID = uint64(202)
+               missingID = uint64(303)
+       )
+
+       req := []WriteRequest{
+               createTestWriteRequest(1, 100, "flushed-data"),
+       }
+
+       // Introduce a part that will be flushed to disk and another that stays 
in memory.
+       writeTestData(t, raw, req, 1, flushedID)
+       writeTestData(t, raw, req, 2, memOnlyID)
+
+       flushIntro, err := raw.Flush(map[uint64]struct{}{flushedID: {}})
+       require.NoError(t, err)
+       require.NotNil(t, flushIntro)
+       raw.IntroduceFlushed(flushIntro)
+       flushIntro.Release()
+
+       // Empty request should return an empty map.
+       require.Empty(t, raw.PartPaths(map[uint64]struct{}{}))
+
+       paths := raw.PartPaths(map[uint64]struct{}{
+               flushedID: {},
+               memOnlyID: {},
+               missingID: {},
+       })
+
+       require.Len(t, paths, 1)
+       expectedPath := partPath(raw.root, flushedID)
+       assert.Equal(t, expectedPath, paths[flushedID])
+       assert.NotContains(t, paths, memOnlyID)
+       assert.NotContains(t, paths, missingID)
+}
+
 // End-to-End Integration Tests.
 
 func TestSIDX_WriteQueryIntegration(t *testing.T) {
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 1bb44a0c..f7fa41de 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -99,6 +99,10 @@ func (*local) GetPort() *uint32 {
 func (*local) Register(bus.Topic, schema.EventHandler) {
 }
 
+func (*local) HealthyNodes() []string {
+       return nil
+}
+
 type localBatchPublisher struct {
        ctx      context.Context
        local    *bus.Bus
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index e8a397ed..fd69d37a 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -480,3 +480,15 @@ func (p *pub) NewChunkedSyncClientWithConfig(node string, 
config *ChunkedSyncCli
                config:    config,
        }, nil
 }
+
+// HealthyNodes returns a list of node names that are currently healthy and 
connected.
+func (p *pub) HealthyNodes() []string {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       nodes := make([]string, 0, len(p.active))
+       for name := range p.active {
+               nodes = append(nodes, name)
+       }
+       return nodes
+}
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 14cab1ee..7193f9a9 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -50,6 +50,7 @@ type Client interface {
        Register(bus.Topic, schema.EventHandler)
        OnAddOrUpdate(md schema.Metadata)
        GracefulStop()
+       HealthyNodes() []string
 }
 
 // Server is the interface for receiving data from the queue.
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
new file mode 100644
index 00000000..4ba09c2e
--- /dev/null
+++ b/banyand/trace/handoff_controller.go
@@ -0,0 +1,1261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "path/filepath"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// handoffController manages handoff queues for multiple data nodes.
+type handoffController struct {
+       fileSystem              fs.FileSystem
+       tire2Client             queueClient
+       resolveShardAssignments func(group string, shardID uint32) ([]string, 
error)
+       inFlightSends           map[string]map[uint64]struct{}
+       l                       *logger.Logger
+       replayTriggerChan       chan string
+       nodeQueues              map[string]*handoffNodeQueue
+       replayStopChan          chan struct{}
+       healthyNodes            map[string]struct{}
+       statusChangeChan        chan nodeStatusChange
+       stopMonitor             chan struct{}
+       root                    string
+       allDataNodes            []string
+       monitorWg               sync.WaitGroup
+       replayWg                sync.WaitGroup
+       checkInterval           time.Duration
+       replayBatchSize         int
+       replayPollInterval      time.Duration
+       maxTotalSizeBytes       uint64
+       currentTotalSize        uint64
+       mu                      sync.RWMutex
+       inFlightMu              sync.RWMutex
+       sizeMu                  sync.RWMutex
+}
+
+// nodeStatusChange represents a node status transition.
+type nodeStatusChange struct {
+       nodeName string
+       isOnline bool
+}
+
+// queueClient interface combines health checking and sync client creation.
+type queueClient interface {
+       HealthyNodes() []string
+       NewChunkedSyncClient(node string, chunkSize uint32) 
(queue.ChunkedSyncClient, error)
+}
+
+// newHandoffController creates a new handoff controller.
+func newHandoffController(fileSystem fs.FileSystem, root string, tire2Client 
queueClient,
+       dataNodeList []string, maxSize int, l *logger.Logger,
+       resolveShardAssignments func(group string, shardID uint32) ([]string, 
error),
+) (*handoffController, error) {
+       if fileSystem == nil {
+               return nil, fmt.Errorf("fileSystem is nil")
+       }
+       if l == nil {
+               return nil, fmt.Errorf("logger is nil")
+       }
+       if root == "" {
+               return nil, fmt.Errorf("root path is empty")
+       }
+
+       handoffRoot := filepath.Join(root, "handoff", "nodes")
+
+       hc := &handoffController{
+               l:                       l,
+               fileSystem:              fileSystem,
+               nodeQueues:              make(map[string]*handoffNodeQueue),
+               root:                    handoffRoot,
+               tire2Client:             tire2Client,
+               allDataNodes:            dataNodeList,
+               healthyNodes:            make(map[string]struct{}),
+               statusChangeChan:        make(chan nodeStatusChange, 100),
+               stopMonitor:             make(chan struct{}),
+               checkInterval:           5 * time.Second,
+               replayStopChan:          make(chan struct{}),
+               replayTriggerChan:       make(chan string, 100),
+               inFlightSends:           make(map[string]map[uint64]struct{}),
+               replayBatchSize:         10,
+               replayPollInterval:      1 * time.Second,
+               maxTotalSizeBytes:       uint64(maxSize),
+               currentTotalSize:        0,
+               resolveShardAssignments: resolveShardAssignments,
+       }
+
+       // Create handoff root directory if it doesn't exist
+       fileSystem.MkdirIfNotExist(handoffRoot, storage.DirPerm)
+
+       // Load existing node queues from disk
+       if err := hc.loadExistingQueues(); err != nil {
+               l.Warn().Err(err).Msg("failed to load existing handoff queues")
+       }
+
+       // Start the node status monitor
+       hc.startMonitor()
+
+       // Start the replay worker
+       hc.startReplayWorker()
+
+       return hc, nil
+}
+
+// loadExistingQueues scans the handoff directory and loads existing node 
queues.
+func (hc *handoffController) loadExistingQueues() error {
+       if hc == nil || hc.fileSystem == nil {
+               return fmt.Errorf("handoff controller is not initialized")
+       }
+
+       entries := hc.fileSystem.ReadDir(hc.root)
+       var totalRecoveredSize uint64
+       var errs []error
+
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       continue
+               }
+
+               nodeRoot := filepath.Join(hc.root, entry.Name())
+
+               // Read the original node address from .node_info file
+               nodeAddr, err := readNodeInfo(hc.fileSystem, nodeRoot)
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("dir", 
entry.Name()).Msg("failed to read node info, skipping")
+                       errs = append(errs, fmt.Errorf("read node info for %s: 
%w", entry.Name(), err))
+                       continue
+               }
+
+               nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot, 
hc.fileSystem, hc.l)
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed 
to load node queue")
+                       errs = append(errs, fmt.Errorf("load node queue for %s: 
%w", nodeAddr, err))
+                       continue
+               }
+
+               hc.nodeQueues[nodeAddr] = nodeQueue
+
+               // Calculate queue size from metadata (not from actual file 
sizes)
+               pending, err := nodeQueue.listPending()
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed 
to list pending parts")
+                       errs = append(errs, fmt.Errorf("list pending for %s: 
%w", nodeAddr, err))
+                       continue
+               }
+               var queueSize uint64
+               for _, ptp := range pending {
+                       meta, err := nodeQueue.getMetadata(ptp.PartID, 
ptp.PartType)
+                       if err != nil {
+                               hc.l.Warn().Err(err).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", ptp.PartID).
+                                       Str("partType", ptp.PartType).
+                                       Msg("failed to read part metadata")
+                               errs = append(errs, fmt.Errorf("metadata for 
node %s part %x (%s): %w",
+                                       nodeAddr, ptp.PartID, ptp.PartType, 
err))
+                               continue
+                       }
+                       if meta.PartSizeBytes > 0 {
+                               queueSize += meta.PartSizeBytes
+                       }
+               }
+               totalRecoveredSize += queueSize
+
+               // Log pending parts count
+               if len(pending) > 0 {
+                       hc.l.Info().
+                               Str("node", nodeAddr).
+                               Int("pending", len(pending)).
+                               Uint64("sizeMB", queueSize/1024/1024).
+                               Msg("loaded handoff queue with pending parts")
+               }
+       }
+
+       // Update current total size
+       hc.currentTotalSize = totalRecoveredSize
+       if totalRecoveredSize > 0 {
+               hc.l.Info().
+                       Uint64("totalSizeMB", totalRecoveredSize/1024/1024).
+                       Int("nodeCount", len(hc.nodeQueues)).
+                       Msg("recovered handoff queue state")
+       }
+       if len(errs) > 0 {
+               return errors.Join(errs...)
+       }
+
+       return nil
+}
+
+// enqueueForNode adds a part to the handoff queue for a specific node.
+func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64, 
partType string, sourcePath string,
+       group string, shardID uint32,
+) error {
+       // Read part size from metadata
+       partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
+
+       // Check if enqueue would exceed limit
+       if !hc.canEnqueue(partSize) {
+               currentSize := hc.getTotalSize()
+               return fmt.Errorf("handoff queue full: current=%d MB, limit=%d 
MB, part=%d MB",
+                       currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024, 
partSize/1024/1024)
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            group,
+               ShardID:          shardID,
+               PartType:         partType,
+               PartSizeBytes:    partSize,
+       }
+
+       nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+       if err != nil {
+               return fmt.Errorf("failed to get node queue for %s: %w", 
nodeAddr, err)
+       }
+
+       if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err != 
nil {
+               return err
+       }
+
+       // Update total size after successful enqueue
+       hc.updateTotalSize(int64(partSize))
+
+       return nil
+}
+
+// enqueueForNodes adds a part to the handoff queues for multiple offline 
nodes.
+func (hc *handoffController) enqueueForNodes(offlineNodes []string, partID 
uint64, partType string, sourcePath string,
+       group string, shardID uint32,
+) error {
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            group,
+               ShardID:          shardID,
+               PartType:         partType,
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       var firstErr error
+       successCount := 0
+
+       // For each offline node, create hard-linked copy
+       for _, nodeAddr := range offlineNodes {
+               nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+               if err != nil {
+                       hc.l.Error().Err(err).Str("node", nodeAddr).Msg("failed 
to get node queue")
+                       if firstErr == nil {
+                               firstErr = err
+                       }
+                       continue
+               }
+
+               if err := nodeQueue.enqueue(partID, partType, sourcePath, 
meta); err != nil {
+                       hc.l.Error().Err(err).Str("node", 
nodeAddr).Uint64("partId", partID).Str("partType", partType).
+                               Msg("failed to enqueue part")
+                       if firstErr == nil {
+                               firstErr = err
+                       }
+                       continue
+               }
+
+               successCount++
+       }
+
+       if successCount > 0 {
+               hc.l.Info().
+                       Int("successCount", successCount).
+                       Int("totalNodes", len(offlineNodes)).
+                       Uint64("partId", partID).
+                       Str("partType", partType).
+                       Msg("part enqueued to handoff queues")
+       }
+
+       // Return error only if all enqueues failed
+       if successCount == 0 && firstErr != nil {
+               return firstErr
+       }
+
+       return nil
+}
+
+// getOrCreateNodeQueue gets an existing node queue or creates a new one.
+// Caller must hold hc.mu lock.
+func (hc *handoffController) getOrCreateNodeQueue(nodeAddr string) 
(*handoffNodeQueue, error) {
+       // Check if queue already exists
+       if queue, exists := hc.nodeQueues[nodeAddr]; exists {
+               return queue, nil
+       }
+
+       // Create new queue
+       sanitizedAddr := sanitizeNodeAddr(nodeAddr)
+       nodeRoot := filepath.Join(hc.root, sanitizedAddr)
+
+       nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot, 
hc.fileSystem, hc.l)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create node queue: %w", err)
+       }
+
+       hc.nodeQueues[nodeAddr] = nodeQueue
+       return nodeQueue, nil
+}
+
+// listPendingForNode returns all pending parts with their types for a 
specific node.
+func (hc *handoffController) listPendingForNode(nodeAddr string) 
([]partTypePair, error) {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return nil, nil // No queue means no pending parts
+       }
+
+       return nodeQueue.listPending()
+}
+
+// getPartPath returns the path to a specific part type directory in a node's 
handoff queue.
+func (hc *handoffController) getPartPath(nodeAddr string, partID uint64, 
partType string) string {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return ""
+       }
+
+       return nodeQueue.getPartTypePath(partID, partType)
+}
+
+// getPartMetadata returns the handoff metadata for a specific part type.
+func (hc *handoffController) getPartMetadata(nodeAddr string, partID uint64, 
partType string) (*handoffMetadata, error) {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return nil, fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       return nodeQueue.getMetadata(partID, partType)
+}
+
+// completeSend removes a specific part type from a node's handoff queue after 
successful delivery.
+func (hc *handoffController) completeSend(nodeAddr string, partID uint64, 
partType string) error {
+       // Get part size before removing
+       var partSize uint64
+       meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+       if err == nil && meta.PartSizeBytes > 0 {
+               partSize = meta.PartSizeBytes
+       }
+
+       hc.mu.RLock()
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       hc.mu.RUnlock()
+
+       if !exists {
+               return fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       if err := nodeQueue.complete(partID, partType); err != nil {
+               return err
+       }
+
+       // Update total size after successful removal
+       if partSize > 0 {
+               hc.updateTotalSize(-int64(partSize))
+       }
+
+       return nil
+}
+
+// completeSendAll removes all part types for a given partID from a node's 
handoff queue.
+func (hc *handoffController) completeSendAll(nodeAddr string, partID uint64) 
error {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       return nodeQueue.completeAll(partID)
+}
+
+// getNodeQueueSize returns the total size of pending parts for a specific 
node.
+func (hc *handoffController) getNodeQueueSize(nodeAddr string) (uint64, error) 
{
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return 0, nil
+       }
+
+       return nodeQueue.size()
+}
+
+// getAllNodeQueues returns a snapshot of all node addresses with handoff 
queues.
+func (hc *handoffController) getAllNodeQueues() []string {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodes := make([]string, 0, len(hc.nodeQueues))
+       for nodeAddr := range hc.nodeQueues {
+               nodes = append(nodes, nodeAddr)
+       }
+
+       return nodes
+}
+
+// partInfo contains information about a part to be enqueued.
+type partInfo struct {
+       path    string
+       group   string
+       partID  uint64
+       shardID common.ShardID
+}
+
+// calculateOfflineNodes returns the list of offline nodes responsible for the 
shard.
+func (hc *handoffController) calculateOfflineNodes(onlineNodes []string, group 
string, shardID common.ShardID) []string {
+       if hc == nil {
+               return nil
+       }
+
+       onlineSet := make(map[string]struct{}, len(onlineNodes))
+       for _, node := range onlineNodes {
+               onlineSet[node] = struct{}{}
+       }
+
+       candidates := hc.nodesForShard(group, uint32(shardID))
+       seen := make(map[string]struct{}, len(candidates))
+       var offlineNodes []string
+       for _, node := range candidates {
+               if _, dup := seen[node]; dup {
+                       continue
+               }
+               seen[node] = struct{}{}
+               if !hc.isNodeHealthy(node) {
+                       offlineNodes = append(offlineNodes, node)
+                       continue
+               }
+               if len(onlineSet) > 0 {
+                       if _, isOnline := onlineSet[node]; !isOnline {
+                               offlineNodes = append(offlineNodes, node)
+                       }
+                       continue
+               }
+               if hc.tire2Client == nil {
+                       offlineNodes = append(offlineNodes, node)
+               }
+       }
+
+       return offlineNodes
+}
+
+// enqueueForOfflineNodes enqueues the provided core and sidx parts for each 
offline node.
+func (hc *handoffController) enqueueForOfflineNodes(
+       offlineNodes []string,
+       coreParts []partInfo,
+       sidxParts map[string][]partInfo,
+) error {
+       if hc == nil || len(offlineNodes) == 0 {
+               return nil
+       }
+
+       group, shardID, hasShardInfo := extractShardDetails(coreParts, 
sidxParts)
+       if hasShardInfo {
+               filtered := hc.filterNodesForShard(offlineNodes, group, shardID)
+               if len(filtered) == 0 {
+                       hc.l.Debug().
+                               Str("group", group).
+                               Uint32("shardID", shardID).
+                               Msg("no offline shard owners to enqueue")
+                       return nil
+               }
+               offlineNodes = filtered
+       }
+
+       // Track enqueue statistics
+       totalCoreEnqueued := 0
+       totalSidxEnqueued := 0
+       var lastErr error
+
+       // For each offline node, enqueue all parts
+       for _, nodeAddr := range offlineNodes {
+               // Enqueue core parts
+               for _, coreInfo := range coreParts {
+                       err := hc.enqueueForNode(nodeAddr, coreInfo.partID, 
PartTypeCore, coreInfo.path, coreInfo.group, uint32(coreInfo.shardID))
+                       if err != nil {
+                               hc.l.Warn().Err(err).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", coreInfo.partID).
+                                       Msg("failed to enqueue core part")
+                               lastErr = err
+                       } else {
+                               totalCoreEnqueued++
+                       }
+               }
+
+               // Enqueue sidx parts
+               for sidxName, parts := range sidxParts {
+                       for _, sidxInfo := range parts {
+                               err := hc.enqueueForNode(nodeAddr, 
sidxInfo.partID, sidxName, sidxInfo.path, sidxInfo.group, 
uint32(sidxInfo.shardID))
+                               if err != nil {
+                                       hc.l.Warn().Err(err).
+                                               Str("node", nodeAddr).
+                                               Str("sidx", sidxName).
+                                               Uint64("partID", 
sidxInfo.partID).
+                                               Msg("failed to enqueue sidx 
part")
+                                       lastErr = err
+                               } else {
+                                       totalSidxEnqueued++
+                               }
+                       }
+               }
+       }
+
+       // Log summary
+       hc.l.Info().
+               Int("offlineNodes", len(offlineNodes)).
+               Str("group", group).
+               Uint32("shardID", shardID).
+               Int("corePartsEnqueued", totalCoreEnqueued).
+               Int("sidxPartsEnqueued", totalSidxEnqueued).
+               Msg("enqueued parts for offline nodes")
+
+       return lastErr
+}
+
+func extractShardDetails(coreParts []partInfo, sidxParts 
map[string][]partInfo) (string, uint32, bool) {
+       if len(coreParts) > 0 {
+               return coreParts[0].group, uint32(coreParts[0].shardID), true
+       }
+       for _, parts := range sidxParts {
+               if len(parts) == 0 {
+                       continue
+               }
+               return parts[0].group, uint32(parts[0].shardID), true
+       }
+       return "", 0, false
+}
+
+func (hc *handoffController) nodesForShard(group string, shardID uint32) 
[]string {
+       if hc == nil {
+               return nil
+       }
+       if hc.resolveShardAssignments == nil {
+               return hc.allDataNodes
+       }
+       nodes, err := hc.resolveShardAssignments(group, shardID)
+       if err != nil {
+               if hc.l != nil {
+                       hc.l.Warn().
+                               Err(err).
+                               Str("group", group).
+                               Uint32("shardID", shardID).
+                               Msg("failed to resolve shard assignments, using 
configured node list")
+               }
+               return hc.allDataNodes
+       }
+       if len(nodes) == 0 {
+               return hc.allDataNodes
+       }
+       return nodes
+}
+
+func (hc *handoffController) filterNodesForShard(nodes []string, group string, 
shardID uint32) []string {
+       candidates := hc.nodesForShard(group, shardID)
+       if len(candidates) == 0 {
+               return nil
+       }
+       candidateSet := make(map[string]struct{}, len(candidates))
+       for _, node := range candidates {
+               candidateSet[node] = struct{}{}
+       }
+       filtered := make([]string, 0, len(nodes))
+       seen := make(map[string]struct{}, len(nodes))
+       for _, node := range nodes {
+               if _, already := seen[node]; already {
+                       continue
+               }
+               seen[node] = struct{}{}
+               if _, ok := candidateSet[node]; ok {
+                       filtered = append(filtered, node)
+               }
+       }
+       return filtered
+}
+
+// close closes the handoff controller.
+func (hc *handoffController) close() error {
+       // Stop the monitor
+       if hc.stopMonitor != nil {
+               close(hc.stopMonitor)
+               hc.monitorWg.Wait()
+       }
+
+       // Stop the replay worker
+       if hc.replayStopChan != nil {
+               close(hc.replayStopChan)
+               hc.replayWg.Wait()
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       // Clear node queues
+       hc.nodeQueues = nil
+
+       // Clear in-flight tracking
+       hc.inFlightSends = nil
+
+       return nil
+}
+
+// getTotalSize returns the current total size across all node queues.
+func (hc *handoffController) getTotalSize() uint64 {
+       hc.sizeMu.RLock()
+       defer hc.sizeMu.RUnlock()
+       return hc.currentTotalSize
+}
+
+// canEnqueue checks if adding a part of the given size would exceed the total 
size limit.
+func (hc *handoffController) canEnqueue(partSize uint64) bool {
+       if hc.maxTotalSizeBytes == 0 {
+               return true // No limit configured
+       }
+
+       hc.sizeMu.RLock()
+       defer hc.sizeMu.RUnlock()
+       return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+}
+
+// readPartSizeFromMetadata reads the CompressedSizeBytes from the part's 
metadata file.
+func (hc *handoffController) readPartSizeFromMetadata(sourcePath, partType 
string) uint64 {
+       var metadataPath string
+
+       // Core parts use metadata.json, sidx parts use manifest.json
+       if partType == PartTypeCore {
+               metadataPath = filepath.Join(sourcePath, metadataFilename) // 
"metadata.json"
+       } else {
+               metadataPath = filepath.Join(sourcePath, "manifest.json")
+       }
+
+       // Read metadata file
+       data, err := hc.fileSystem.Read(metadataPath)
+       if err != nil {
+               hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to 
read metadata file")
+               return 0
+       }
+
+       // Parse metadata to get CompressedSizeBytes
+       var metadata struct {
+               CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+       }
+       if err := json.Unmarshal(data, &metadata); err != nil {
+               hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to 
parse metadata")
+               return 0
+       }
+
+       return metadata.CompressedSizeBytes
+}
+
+// updateTotalSize atomically updates the current total size.
+func (hc *handoffController) updateTotalSize(delta int64) {
+       hc.sizeMu.Lock()
+       defer hc.sizeMu.Unlock()
+
+       if delta > 0 {
+               // Enqueue: add to total
+               hc.currentTotalSize += uint64(delta)
+       } else if delta < 0 {
+               // Complete: subtract from total with underflow check
+               toSubtract := uint64(-delta)
+               if toSubtract > hc.currentTotalSize {
+                       hc.l.Warn().
+                               Uint64("current", hc.currentTotalSize).
+                               Uint64("toSubtract", toSubtract).
+                               Msg("attempted to subtract more than current 
size, resetting to 0")
+                       hc.currentTotalSize = 0
+               } else {
+                       hc.currentTotalSize -= toSubtract
+               }
+       }
+}
+
+// sanitizeNodeAddr converts a node address to a safe directory name.
+// It replaces colons and other special characters with underscores.
+func sanitizeNodeAddr(addr string) string {
+       // Replace common special characters
+       sanitized := strings.ReplaceAll(addr, ":", "_")
+       sanitized = strings.ReplaceAll(sanitized, "/", "_")
+       sanitized = strings.ReplaceAll(sanitized, "\\", "_")
+       return sanitized
+}
+
+// startMonitor starts the background node status monitoring goroutine.
+func (hc *handoffController) startMonitor() {
+       if hc.tire2Client == nil || len(hc.allDataNodes) == 0 {
+               hc.l.Info().Msg("node status monitor disabled (no tire2 client 
or data nodes)")
+               return
+       }
+
+       // Initialize healthy nodes from tire2 client
+       currentHealthy := hc.tire2Client.HealthyNodes()
+       for _, node := range currentHealthy {
+               hc.healthyNodes[node] = struct{}{}
+       }
+
+       hc.monitorWg.Add(2)
+
+       // Goroutine 1: Periodic polling
+       go hc.pollNodeStatus()
+
+       // Goroutine 2: Handle status changes
+       go hc.handleStatusChanges()
+
+       hc.l.Info().
+               Int("dataNodes", len(hc.allDataNodes)).
+               Dur("checkInterval", hc.checkInterval).
+               Msg("node status monitor started")
+}
+
+// pollNodeStatus periodically polls the pub client for healthy nodes.
+func (hc *handoffController) pollNodeStatus() {
+       defer hc.monitorWg.Done()
+
+       ticker := time.NewTicker(hc.checkInterval)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       hc.checkAndNotifyStatusChanges()
+               case <-hc.stopMonitor:
+                       return
+               }
+       }
+}
+
+// checkAndNotifyStatusChanges compares current vs previous health status.
+func (hc *handoffController) checkAndNotifyStatusChanges() {
+       if hc.tire2Client == nil {
+               return
+       }
+
+       currentHealthy := hc.tire2Client.HealthyNodes()
+       currentHealthySet := make(map[string]struct{})
+       for _, node := range currentHealthy {
+               currentHealthySet[node] = struct{}{}
+       }
+
+       hc.mu.Lock()
+       previousHealthy := hc.healthyNodes
+       hc.healthyNodes = currentHealthySet
+       hc.mu.Unlock()
+
+       // Detect nodes that came online (in current but not in previous)
+       for node := range currentHealthySet {
+               if _, wasHealthy := previousHealthy[node]; !wasHealthy {
+                       select {
+                       case hc.statusChangeChan <- nodeStatusChange{nodeName: 
node, isOnline: true}:
+                               hc.l.Info().Str("node", node).Msg("detected 
node coming online")
+                       default:
+                               hc.l.Warn().Str("node", node).Msg("status 
change channel full")
+                       }
+               }
+       }
+
+       // Detect nodes that went offline (in previous but not in current)
+       for node := range previousHealthy {
+               if _, isHealthy := currentHealthySet[node]; !isHealthy {
+                       select {
+                       case hc.statusChangeChan <- nodeStatusChange{nodeName: 
node, isOnline: false}:
+                               hc.l.Info().Str("node", node).Msg("detected 
node going offline")
+                       default:
+                               hc.l.Warn().Str("node", node).Msg("status 
change channel full")
+                       }
+               }
+       }
+}
+
+// handleStatusChanges processes node status changes.
+func (hc *handoffController) handleStatusChanges() {
+       defer hc.monitorWg.Done()
+
+       for {
+               select {
+               case change := <-hc.statusChangeChan:
+                       if change.isOnline {
+                               hc.onNodeOnline(change.nodeName)
+                       } else {
+                               hc.onNodeOffline(change.nodeName)
+                       }
+               case <-hc.stopMonitor:
+                       return
+               }
+       }
+}
+
+// onNodeOnline handles a node coming online.
+func (hc *handoffController) onNodeOnline(nodeName string) {
+       hc.mu.RLock()
+       _, hasQueue := hc.nodeQueues[nodeName]
+       hc.mu.RUnlock()
+
+       if !hasQueue {
+               hc.l.Debug().Str("node", nodeName).Msg("node online but no 
handoff queue")
+               return
+       }
+
+       // Trigger replay for this node
+       select {
+       case hc.replayTriggerChan <- nodeName:
+               hc.l.Info().Str("node", nodeName).Msg("triggered replay for 
recovered node")
+       default:
+               hc.l.Warn().Str("node", nodeName).Msg("replay trigger channel 
full")
+       }
+}
+
+// onNodeOffline handles a node going offline.
+func (hc *handoffController) onNodeOffline(nodeName string) {
+       hc.l.Info().Str("node", nodeName).Msg("node went offline")
+       // No immediate action needed - syncer will detect send failures
+}
+
+// isNodeHealthy checks if a specific node is currently healthy.
+func (hc *handoffController) isNodeHealthy(nodeName string) bool {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+       _, healthy := hc.healthyNodes[nodeName]
+       return healthy
+}
+
+// startReplayWorker starts the background replay worker goroutine.
+func (hc *handoffController) startReplayWorker() {
+       if hc.tire2Client == nil {
+               hc.l.Info().Msg("replay worker disabled (no tire2 client)")
+               return
+       }
+
+       hc.replayWg.Add(1)
+       go hc.replayWorkerLoop()
+
+       hc.l.Info().
+               Int("batchSize", hc.replayBatchSize).
+               Dur("pollInterval", hc.replayPollInterval).
+               Msg("replay worker started")
+}
+
+// replayWorkerLoop is the main replay worker loop.
+func (hc *handoffController) replayWorkerLoop() {
+       defer hc.replayWg.Done()
+
+       ticker := time.NewTicker(hc.replayPollInterval)
+       defer ticker.Stop()
+
+       // Track nodes that have been triggered for replay
+       triggeredNodes := make(map[string]struct{})
+
+       for {
+               select {
+               case nodeAddr := <-hc.replayTriggerChan:
+                       // Node came online, mark for immediate processing
+                       triggeredNodes[nodeAddr] = struct{}{}
+                       hc.l.Debug().Str("node", nodeAddr).Msg("node marked for 
replay")
+
+               case <-ticker.C:
+                       // Periodic check for work
+                       nodesWithWork := hc.getNodesWithPendingParts()
+
+                       // Process triggered nodes first, then round-robin 
through all nodes with work
+                       nodesToProcess := make([]string, 0, 
len(triggeredNodes)+len(nodesWithWork))
+                       for node := range triggeredNodes {
+                               nodesToProcess = append(nodesToProcess, node)
+                       }
+                       for _, node := range nodesWithWork {
+                               if _, alreadyTriggered := triggeredNodes[node]; 
!alreadyTriggered {
+                                       nodesToProcess = append(nodesToProcess, 
node)
+                               }
+                       }
+
+                       // Process each node with pending parts (round-robin)
+                       for _, nodeAddr := range nodesToProcess {
+                               select {
+                               case <-hc.replayStopChan:
+                                       return
+                               default:
+                               }
+
+                               // Check if node is healthy
+                               if !hc.isNodeHealthy(nodeAddr) {
+                                       continue
+                               }
+
+                               // Process a batch for this node
+                               processed, err := 
hc.replayBatchForNode(nodeAddr, hc.replayBatchSize)
+                               if err != nil {
+                                       hc.l.Warn().Err(err).Str("node", 
nodeAddr).Msg("replay batch failed")
+                               }
+
+                               // If we processed parts successfully, remove 
from triggered list
+                               if processed > 0 {
+                                       delete(triggeredNodes, nodeAddr)
+                               }
+
+                               // If node has no more pending parts, remove 
from triggered list
+                               pending, _ := hc.listPendingForNode(nodeAddr)
+                               if len(pending) == 0 {
+                                       delete(triggeredNodes, nodeAddr)
+                               }
+                       }
+
+               case <-hc.replayStopChan:
+                       return
+               }
+       }
+}
+
+// replayBatchForNode processes a batch of parts for a specific node.
+// Returns the number of parts successfully replayed and any error.
+func (hc *handoffController) replayBatchForNode(nodeAddr string, maxParts int) 
(int, error) {
+       // Get pending parts for this node
+       pending, err := hc.listPendingForNode(nodeAddr)
+       if err != nil {
+               return 0, fmt.Errorf("failed to list pending parts: %w", err)
+       }
+
+       if len(pending) == 0 {
+               return 0, nil
+       }
+
+       // Limit to batch size
+       if len(pending) > maxParts {
+               pending = pending[:maxParts]
+       }
+
+       successCount := 0
+       ctx := context.Background()
+
+       for _, ptp := range pending {
+               // Check if already in-flight
+               if hc.isInFlight(nodeAddr, ptp.PartID) {
+                       hc.l.Debug().
+                               Str("node", nodeAddr).
+                               Uint64("partID", ptp.PartID).
+                               Str("partType", ptp.PartType).
+                               Msg("skipping in-flight part")
+                       continue
+               }
+
+               // Mark as in-flight
+               hc.markInFlight(nodeAddr, ptp.PartID, true)
+
+               // Read part from handoff queue
+               streamingPart, release, err := hc.readPartFromHandoff(nodeAddr, 
ptp.PartID, ptp.PartType)
+               if err != nil {
+                       hc.l.Error().Err(err).
+                               Str("node", nodeAddr).
+                               Uint64("partID", ptp.PartID).
+                               Str("partType", ptp.PartType).
+                               Msg("failed to read part from handoff")
+                       hc.markInFlight(nodeAddr, ptp.PartID, false)
+                       continue
+               }
+
+               // Send part to node
+               err = hc.sendPartToNode(ctx, nodeAddr, streamingPart)
+               release()
+               if err != nil {
+                       hc.l.Warn().Err(err).
+                               Str("node", nodeAddr).
+                               Uint64("partID", ptp.PartID).
+                               Str("partType", ptp.PartType).
+                               Msg("failed to send part during replay")
+                       hc.markInFlight(nodeAddr, ptp.PartID, false)
+                       continue
+               }
+
+               // Mark as complete
+               if err := hc.completeSend(nodeAddr, ptp.PartID, ptp.PartType); 
err != nil {
+                       hc.l.Warn().Err(err).
+                               Str("node", nodeAddr).
+                               Uint64("partID", ptp.PartID).
+                               Str("partType", ptp.PartType).
+                               Msg("failed to mark part as complete")
+               }
+
+               // Remove from in-flight
+               hc.markInFlight(nodeAddr, ptp.PartID, false)
+
+               successCount++
+
+               hc.l.Info().
+                       Str("node", nodeAddr).
+                       Uint64("partID", ptp.PartID).
+                       Str("partType", ptp.PartType).
+                       Msg("successfully replayed part")
+       }
+
+       return successCount, nil
+}
+
+// markInFlight marks a part as being sent (or removes the mark).
+func (hc *handoffController) markInFlight(nodeAddr string, partID uint64, 
inFlight bool) {
+       hc.inFlightMu.Lock()
+       defer hc.inFlightMu.Unlock()
+
+       if inFlight {
+               // Add to in-flight set
+               if hc.inFlightSends[nodeAddr] == nil {
+                       hc.inFlightSends[nodeAddr] = make(map[uint64]struct{})
+               }
+               hc.inFlightSends[nodeAddr][partID] = struct{}{}
+       } else {
+               // Remove from in-flight set
+               if nodeSet, exists := hc.inFlightSends[nodeAddr]; exists {
+                       delete(nodeSet, partID)
+                       if len(nodeSet) == 0 {
+                               delete(hc.inFlightSends, nodeAddr)
+                       }
+               }
+       }
+}
+
+// isInFlight checks if a part is currently being sent to a node.
+func (hc *handoffController) isInFlight(nodeAddr string, partID uint64) bool {
+       hc.inFlightMu.RLock()
+       defer hc.inFlightMu.RUnlock()
+
+       if nodeSet, exists := hc.inFlightSends[nodeAddr]; exists {
+               _, inFlight := nodeSet[partID]
+               return inFlight
+       }
+       return false
+}
+
+// getNodesWithPendingParts returns all node addresses that have pending parts 
in their queues.
+func (hc *handoffController) getNodesWithPendingParts() []string {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       var nodes []string
+       for nodeAddr, queue := range hc.nodeQueues {
+               pending, _ := queue.listPending()
+               if len(pending) > 0 {
+                       nodes = append(nodes, nodeAddr)
+               }
+       }
+
+       return nodes
+}
+
+// readPartFromHandoff reads a part from the handoff queue and prepares it for 
sending.
+func (hc *handoffController) readPartFromHandoff(nodeAddr string, partID 
uint64, partType string) (*queue.StreamingPartData, func(), error) {
+       // Get the path to the hard-linked part
+       partPath := hc.getPartPath(nodeAddr, partID, partType)
+       if partPath == "" {
+               return nil, func() {}, fmt.Errorf("part not found in handoff 
queue")
+       }
+
+       // Get the metadata for this part
+       meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+       if err != nil {
+               return nil, func() {}, fmt.Errorf("failed to get part metadata: 
%w", err)
+       }
+
+       // Read files directly from the filesystem (both core and sidx parts)
+       // The handoff storage uses nested structure: 
<nodeRoot>/<partID>/<partType>/
+       entries := hc.fileSystem.ReadDir(partPath)
+       var files []queue.FileInfo
+       var buffers []*bytes.Buffer
+
+       for _, entry := range entries {
+               if entry.IsDir() || strings.HasPrefix(entry.Name(), ".") {
+                       continue
+               }
+
+               streamName, include := mapStreamingFileName(partType, 
entry.Name())
+               if !include {
+                       continue
+               }
+
+               filePath := filepath.Join(partPath, entry.Name())
+               data, err := hc.fileSystem.Read(filePath)
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("file", 
entry.Name()).Msg("failed to read file")
+                       continue
+               }
+
+               // Create a buffer and use its sequential reader
+               buf := bigValuePool.Generate()
+               buf.Buf = append(buf.Buf[:0], data...)
+               buffers = append(buffers, buf)
+
+               files = append(files, queue.FileInfo{
+                       Name:   streamName,
+                       Reader: buf.SequentialRead(),
+               })
+       }
+
+       if len(files) == 0 {
+               for _, buf := range buffers {
+                       bigValuePool.Release(buf)
+               }
+               return nil, func() {}, fmt.Errorf("no files found in part 
directory")
+       }
+
+       // Create streaming part data
+       streamingPart := &queue.StreamingPartData{
+               ID:       partID,
+               Group:    meta.Group,
+               ShardID:  meta.ShardID,
+               Topic:    data.TopicTracePartSync.String(),
+               Files:    files,
+               PartType: partType,
+       }
+
+       // For core parts, read additional metadata from metadata.json if 
present
+       if partType == PartTypeCore {
+               metadataPath := filepath.Join(partPath, metadataFilename)
+               if metadataBytes, err := hc.fileSystem.Read(metadataPath); err 
== nil {
+                       var pm partMetadata
+                       if err := json.Unmarshal(metadataBytes, &pm); err == 
nil {
+                               streamingPart.CompressedSizeBytes = 
pm.CompressedSizeBytes
+                               streamingPart.UncompressedSizeBytes = 
pm.UncompressedSpanSizeBytes
+                               streamingPart.TotalCount = pm.TotalCount
+                               streamingPart.BlocksCount = pm.BlocksCount
+                               streamingPart.MinTimestamp = pm.MinTimestamp
+                               streamingPart.MaxTimestamp = pm.MaxTimestamp
+                       }
+               }
+       }
+
+       release := func() {
+               for _, buf := range buffers {
+                       bigValuePool.Release(buf)
+               }
+       }
+
+       return streamingPart, release, nil
+}
+
+func mapStreamingFileName(partType, fileName string) (string, bool) {
+       if partType == PartTypeCore {
+               switch fileName {
+               case primaryFilename:
+                       return tracePrimaryName, true
+               case spansFilename:
+                       return traceSpansName, true
+               case metaFilename:
+                       return traceMetaName, true
+               case traceIDFilterFilename, tagTypeFilename:
+                       return fileName, true
+               case metadataFilename:
+                       return "", false
+               }
+
+               if strings.HasSuffix(fileName, tagsFilenameExt) {
+                       tagName := strings.TrimSuffix(fileName, tagsFilenameExt)
+                       return traceTagsPrefix + tagName, true
+               }
+               if strings.HasSuffix(fileName, tagsMetadataFilenameExt) {
+                       tagName := strings.TrimSuffix(fileName, 
tagsMetadataFilenameExt)
+                       return traceTagMetadataPrefix + tagName, true
+               }
+
+               return "", false
+       }
+
+       switch fileName {
+       case sidx.SidxPrimaryName + ".bin":
+               return sidx.SidxPrimaryName, true
+       case sidx.SidxDataName + ".bin":
+               return sidx.SidxDataName, true
+       case sidx.SidxKeysName + ".bin":
+               return sidx.SidxKeysName, true
+       case sidx.SidxMetaName + ".bin":
+               return sidx.SidxMetaName, true
+       case "manifest.json":
+               return "", false
+       }
+
+       if strings.HasSuffix(fileName, ".td") {
+               tagName := strings.TrimSuffix(fileName, ".td")
+               return sidx.TagDataPrefix + tagName, true
+       }
+       if strings.HasSuffix(fileName, ".tm") {
+               tagName := strings.TrimSuffix(fileName, ".tm")
+               return sidx.TagMetadataPrefix + tagName, true
+       }
+       if strings.HasSuffix(fileName, ".tf") {
+               tagName := strings.TrimSuffix(fileName, ".tf")
+               return sidx.TagFilterPrefix + tagName, true
+       }
+
+       return "", false
+}
+
+// sendPartToNode sends a single part to a node using ChunkedSyncClient.
+func (hc *handoffController) sendPartToNode(ctx context.Context, nodeAddr 
string, streamingPart *queue.StreamingPartData) error {
+       // Create chunked sync client
+       chunkedClient, err := hc.tire2Client.NewChunkedSyncClient(nodeAddr, 
1024*1024)
+       if err != nil {
+               return fmt.Errorf("failed to create chunked sync client: %w", 
err)
+       }
+       defer chunkedClient.Close()
+
+       // Send the part
+       result, err := chunkedClient.SyncStreamingParts(ctx, 
[]queue.StreamingPartData{*streamingPart})
+       if err != nil {
+               return fmt.Errorf("failed to sync streaming part: %w", err)
+       }
+
+       if !result.Success {
+               return fmt.Errorf("sync failed: %s", result.ErrorMessage)
+       }
+
+       hc.l.Debug().
+               Str("node", nodeAddr).
+               Uint64("partID", streamingPart.ID).
+               Str("partType", streamingPart.PartType).
+               Str("session", result.SessionID).
+               Uint64("bytes", result.TotalBytes).
+               Msg("part sent successfully during replay")
+
+       return nil
+}
diff --git a/banyand/trace/handoff_replay_test.go 
b/banyand/trace/handoff_replay_test.go
new file mode 100644
index 00000000..ec0610cd
--- /dev/null
+++ b/banyand/trace/handoff_replay_test.go
@@ -0,0 +1,406 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "path/filepath"
+       "sync"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+const (
+       testNodeAddrPrimary   = "node1.example.com:17912"
+       testNodeAddrSecondary = "node2.example.com:17912"
+)
+
+// simpleMockClient is a minimal mock for testing replay functionality.
+type simpleMockClient struct {
+       sendError    error
+       healthyNodes []string
+       sendCalled   int
+       mu           sync.Mutex
+}
+
+func newSimpleMockClient(healthyNodes []string) *simpleMockClient {
+       return &simpleMockClient{
+               healthyNodes: healthyNodes,
+       }
+}
+
+func (m *simpleMockClient) HealthyNodes() []string {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       return m.healthyNodes
+}
+
+func (m *simpleMockClient) NewChunkedSyncClient(node string, _ uint32) 
(queue.ChunkedSyncClient, error) {
+       return &simpleMockChunkedClient{
+               mockClient: m,
+               node:       node,
+       }, nil
+}
+
+func (m *simpleMockClient) getSendCount() int {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       return m.sendCalled
+}
+
+func (m *simpleMockClient) resetSendCount() {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.sendCalled = 0
+}
+
+// simpleMockChunkedClient is a minimal mock for ChunkedSyncClient.
+type simpleMockChunkedClient struct {
+       mockClient *simpleMockClient
+       node       string
+}
+
+func (m *simpleMockChunkedClient) SyncStreamingParts(_ context.Context, parts 
[]queue.StreamingPartData) (*queue.SyncResult, error) {
+       m.mockClient.mu.Lock()
+       m.mockClient.sendCalled++
+       sendError := m.mockClient.sendError
+       m.mockClient.mu.Unlock()
+
+       if sendError != nil {
+               return nil, sendError
+       }
+       return &queue.SyncResult{
+               Success:    true,
+               SessionID:  "test-session",
+               TotalBytes: 1000,
+               PartsCount: uint32(len(parts)),
+       }, nil
+}
+
+func (m *simpleMockChunkedClient) Close() error {
+       return nil
+}
+
+// TestHandoffController_InFlightTracking tests the in-flight tracking 
mechanism.
+func TestHandoffController_InFlightTracking(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+       partID := uint64(0x10)
+
+       // Initially not in-flight
+       assert.False(t, controller.isInFlight(nodeAddr, partID))
+
+       // Mark as in-flight
+       controller.markInFlight(nodeAddr, partID, true)
+       assert.True(t, controller.isInFlight(nodeAddr, partID))
+
+       // Mark as not in-flight
+       controller.markInFlight(nodeAddr, partID, false)
+       assert.False(t, controller.isInFlight(nodeAddr, partID))
+
+       // Test multiple parts
+       partID2 := uint64(0x11)
+       controller.markInFlight(nodeAddr, partID, true)
+       controller.markInFlight(nodeAddr, partID2, true)
+       assert.True(t, controller.isInFlight(nodeAddr, partID))
+       assert.True(t, controller.isInFlight(nodeAddr, partID2))
+
+       // Remove one
+       controller.markInFlight(nodeAddr, partID, false)
+       assert.False(t, controller.isInFlight(nodeAddr, partID))
+       assert.True(t, controller.isInFlight(nodeAddr, partID2))
+}
+
+// TestHandoffController_GetNodesWithPendingParts tests the node enumeration.
+func TestHandoffController_GetNodesWithPendingParts(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x12)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       nodeAddr1 := testNodeAddrPrimary
+       nodeAddr2 := testNodeAddrSecondary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr1, nodeAddr2}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Initially no nodes with pending parts
+       nodes := controller.getNodesWithPendingParts()
+       assert.Empty(t, nodes)
+
+       // Enqueue for one node
+       err = controller.enqueueForNode(nodeAddr1, partID, PartTypeCore, 
sourcePath, "default", 0)
+       require.NoError(t, err)
+
+       nodes = controller.getNodesWithPendingParts()
+       assert.Len(t, nodes, 1)
+       assert.Contains(t, nodes, nodeAddr1)
+
+       // Enqueue for another node
+       partID2 := uint64(0x13)
+       sourcePath2 := createTestPart(t, fileSystem, sourceRoot, partID2)
+       err = controller.enqueueForNode(nodeAddr2, partID2, PartTypeCore, 
sourcePath2, "default", 0)
+       require.NoError(t, err)
+
+       nodes = controller.getNodesWithPendingParts()
+       assert.Len(t, nodes, 2)
+       assert.Contains(t, nodes, nodeAddr1)
+       assert.Contains(t, nodes, nodeAddr2)
+
+       // Complete one node's parts
+       err = controller.completeSend(nodeAddr1, partID, PartTypeCore)
+       require.NoError(t, err)
+
+       nodes = controller.getNodesWithPendingParts()
+       assert.Len(t, nodes, 1)
+       assert.Contains(t, nodes, nodeAddr2)
+}
+
+// TestHandoffController_ReadPartFromHandoff tests reading parts from handoff 
storage.
+func TestHandoffController_ReadPartFromHandoff(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x14)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       nodeAddr := testNodeAddrPrimary
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Enqueue a part
+       err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore, 
sourcePath, "default", 0)
+       require.NoError(t, err)
+
+       // Read the part back
+       streamingPart, release, err := controller.readPartFromHandoff(nodeAddr, 
partID, PartTypeCore)
+       require.NoError(t, err)
+       require.NotNil(t, streamingPart)
+       release()
+
+       // Verify basic fields
+       assert.Equal(t, partID, streamingPart.ID)
+       assert.Equal(t, "default", streamingPart.Group)
+       assert.Equal(t, PartTypeCore, streamingPart.PartType)
+       assert.NotEmpty(t, streamingPart.Files)
+}
+
+// TestHandoffController_ReplayBatchForNode tests the batch replay logic.
+func TestHandoffController_ReplayBatchForNode(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       nodeAddr := testNodeAddrPrimary
+       // Create mock client
+       mockClient := newSimpleMockClient([]string{nodeAddr})
+
+       // Create source parts
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+       controller, err := newHandoffController(fileSystem, tempDir, 
mockClient, []string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Enqueue multiple parts
+       numParts := 3
+       for i := 0; i < numParts; i++ {
+               partID := uint64(0x20 + i)
+               sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+               err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore, 
sourcePath, "default", 0)
+               require.NoError(t, err)
+       }
+
+       // Verify parts are pending
+       pending, err := controller.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Len(t, pending, numParts)
+
+       // Replay batch (should process all 3 parts)
+       count, err := controller.replayBatchForNode(nodeAddr, 10)
+       require.NoError(t, err)
+       assert.Equal(t, numParts, count)
+       assert.Equal(t, numParts, mockClient.getSendCount())
+
+       // Verify all parts completed
+       pending, err = controller.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Empty(t, pending)
+}
+
+// TestHandoffController_ReplayBatchForNode_WithBatchLimit tests batch size 
limiting.
+func TestHandoffController_ReplayBatchForNode_WithBatchLimit(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       nodeAddr := testNodeAddrPrimary
+       // Create mock client
+       mockClient := newSimpleMockClient([]string{nodeAddr})
+
+       // Create source parts
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+       controller, err := newHandoffController(fileSystem, tempDir, 
mockClient, []string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Enqueue 5 parts
+       numParts := 5
+       for i := 0; i < numParts; i++ {
+               partID := uint64(0x30 + i)
+               sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+               err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore, 
sourcePath, "default", 0)
+               require.NoError(t, err)
+       }
+
+       // Replay with batch limit of 2
+       count, err := controller.replayBatchForNode(nodeAddr, 2)
+       require.NoError(t, err)
+       assert.Equal(t, 2, count)
+       assert.Equal(t, 2, mockClient.getSendCount())
+
+       // Verify 3 parts still pending
+       pending, err := controller.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Len(t, pending, 3)
+
+       // Replay again
+       mockClient.resetSendCount()
+       count, err = controller.replayBatchForNode(nodeAddr, 2)
+       require.NoError(t, err)
+       assert.Equal(t, 2, count)
+
+       // Verify 1 part still pending
+       pending, err = controller.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Len(t, pending, 1)
+}
+
+// TestHandoffController_ReplayBatchForNode_SkipsInFlight tests that in-flight 
parts are skipped.
+func TestHandoffController_ReplayBatchForNode_SkipsInFlight(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       nodeAddr := testNodeAddrPrimary
+       // Create mock client
+       mockClient := newSimpleMockClient([]string{nodeAddr})
+
+       // Create source parts
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+
+       controller, err := newHandoffController(fileSystem, tempDir, 
mockClient, []string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Enqueue parts
+       partID1 := uint64(0x40)
+       partID2 := uint64(0x41)
+       sourcePath1 := createTestPart(t, fileSystem, sourceRoot, partID1)
+       sourcePath2 := createTestPart(t, fileSystem, sourceRoot, partID2)
+
+       err = controller.enqueueForNode(nodeAddr, partID1, PartTypeCore, 
sourcePath1, "default", 0)
+       require.NoError(t, err)
+       err = controller.enqueueForNode(nodeAddr, partID2, PartTypeCore, 
sourcePath2, "default", 0)
+       require.NoError(t, err)
+
+       // Mark first part as in-flight
+       controller.markInFlight(nodeAddr, partID1, true)
+
+       // Replay should skip first part and only process second
+       count, err := controller.replayBatchForNode(nodeAddr, 10)
+       require.NoError(t, err)
+       assert.Equal(t, 1, count)
+
+       // First part should still be pending
+       pending, err := controller.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Len(t, pending, 1)
+       assert.Equal(t, partID1, pending[0].PartID)
+}
+
+// TestHandoffController_SendPartToNode tests sending a part to a node.
+func TestHandoffController_SendPartToNode(t *testing.T) {
+       tempDir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       nodeAddr := testNodeAddrPrimary
+       // Create mock client
+       mockClient := newSimpleMockClient([]string{nodeAddr})
+
+       controller, err := newHandoffController(fileSystem, tempDir, 
mockClient, []string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+       defer controller.close()
+
+       // Create a streaming part
+       streamingPart := &queue.StreamingPartData{
+               ID:       0x50,
+               Group:    "default",
+               ShardID:  0,
+               PartType: PartTypeCore,
+               Files:    []queue.FileInfo{},
+       }
+
+       // Send the part
+       err = controller.sendPartToNode(context.Background(), 
testNodeAddrPrimary, streamingPart)
+       require.NoError(t, err)
+       assert.Equal(t, 1, mockClient.getSendCount())
+}
diff --git a/banyand/trace/handoff_storage.go b/banyand/trace/handoff_storage.go
new file mode 100644
index 00000000..b1ccc5b2
--- /dev/null
+++ b/banyand/trace/handoff_storage.go
@@ -0,0 +1,373 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "encoding/json"
+       "fmt"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       handoffMetaFilename = ".handoff_meta"
+       nodeInfoFilename    = ".node_info"
+)
+
+// handoffMetadata contains metadata for a handoff queue entry.
+type handoffMetadata struct {
+       Group            string `json:"group"`
+       PartType         string `json:"part_type"`
+       EnqueueTimestamp int64  `json:"enqueue_timestamp"`
+       PartSizeBytes    uint64 `json:"part_size_bytes"`
+       ShardID          uint32 `json:"shard_id"`
+}
+
+// handoffNodeQueue manages the handoff queue for a single data node.
+// It uses a per-node directory with hard-linked part directories.
+type handoffNodeQueue struct {
+       fileSystem fs.FileSystem
+       l          *logger.Logger
+       nodeAddr   string
+       root       string
+       mu         sync.RWMutex
+}
+
+// newHandoffNodeQueue creates a new handoff queue for a specific node.
+func newHandoffNodeQueue(nodeAddr, root string, fileSystem fs.FileSystem, l 
*logger.Logger) (*handoffNodeQueue, error) {
+       if fileSystem == nil {
+               return nil, fmt.Errorf("fileSystem is nil")
+       }
+       if l == nil {
+               return nil, fmt.Errorf("logger is nil")
+       }
+       if nodeAddr == "" {
+               return nil, fmt.Errorf("node address is empty")
+       }
+       if root == "" {
+               return nil, fmt.Errorf("queue root path is empty")
+       }
+
+       hnq := &handoffNodeQueue{
+               nodeAddr:   nodeAddr,
+               root:       root,
+               fileSystem: fileSystem,
+               l:          l,
+       }
+
+       // Create node queue directory if it doesn't exist
+       fileSystem.MkdirIfNotExist(root, storage.DirPerm)
+
+       // Write node info file to persist the original node address
+       if err := hnq.writeNodeInfo(); err != nil {
+               l.Warn().Err(err).Msg("failed to write node info")
+       }
+
+       return hnq, nil
+}
+
+// writeNodeInfo writes the original node address to a metadata file.
+func (hnq *handoffNodeQueue) writeNodeInfo() error {
+       nodeInfoPath := filepath.Join(hnq.root, nodeInfoFilename)
+
+       // Check if already exists
+       if _, err := hnq.fileSystem.Read(nodeInfoPath); err == nil {
+               return nil // Already exists
+       }
+
+       lf, err := hnq.fileSystem.CreateLockFile(nodeInfoPath, storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("failed to create node info file: %w", err)
+       }
+
+       _, err = lf.Write([]byte(hnq.nodeAddr))
+       return err
+}
+
+// readNodeInfo reads the original node address from the metadata file.
+func readNodeInfo(fileSystem fs.FileSystem, root string) (string, error) {
+       nodeInfoPath := filepath.Join(root, nodeInfoFilename)
+       data, err := fileSystem.Read(nodeInfoPath)
+       if err != nil {
+               return "", fmt.Errorf("failed to read node info: %w", err)
+       }
+       return string(data), nil
+}
+
+// enqueue adds a part to the handoff queue by creating hard links and writing 
metadata.
+// Uses nested structure: <nodeRoot>/<partId>/<partType>/.
+func (hnq *handoffNodeQueue) enqueue(partID uint64, partType string, 
sourcePath string, meta *handoffMetadata) error {
+       hnq.mu.Lock()
+       defer hnq.mu.Unlock()
+
+       dstPath := hnq.getPartTypePath(partID, partType)
+
+       // Check if already exists (idempotent)
+       partIDDir := hnq.getPartIDDir(partID)
+
+       // Create parent directory for partID if it doesn't exist
+       hnq.fileSystem.MkdirIfNotExist(partIDDir, storage.DirPerm)
+
+       // Now check if part type already exists
+       partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+       for _, entry := range partTypeEntries {
+               if entry.Name() == partType && entry.IsDir() {
+                       return nil // Already enqueued
+               }
+       }
+
+       // Create hardlinks directly to destination
+       if err := copyPartDirectory(hnq.fileSystem, sourcePath, dstPath); err 
!= nil {
+               // Clean up partial copy on failure
+               hnq.fileSystem.MustRMAll(dstPath)
+               return fmt.Errorf("failed to hardlink part: %w", err)
+       }
+
+       // Write sidecar metadata file after hardlinks are created
+       metaPath := filepath.Join(dstPath, handoffMetaFilename)
+       metaBytes, err := json.Marshal(meta)
+       if err != nil {
+               hnq.fileSystem.MustRMAll(dstPath)
+               return fmt.Errorf("failed to marshal metadata: %w", err)
+       }
+
+       lf, err := hnq.fileSystem.CreateLockFile(metaPath, storage.FilePerm)
+       if err != nil {
+               hnq.fileSystem.MustRMAll(dstPath)
+               return fmt.Errorf("failed to create metadata file: %w", err)
+       }
+       if _, err := lf.Write(metaBytes); err != nil {
+               hnq.fileSystem.MustRMAll(dstPath)
+               return fmt.Errorf("failed to write metadata: %w", err)
+       }
+
+       hnq.l.Info().
+               Str("node", hnq.nodeAddr).
+               Uint64("partId", partID).
+               Str("partType", partType).
+               Str("path", dstPath).
+               Msg("part enqueued to handoff queue")
+
+       return nil
+}
+
+// partTypePair represents a pending part with its type.
+type partTypePair struct {
+       PartType string
+       PartID   uint64
+}
+
+// listPending returns a sorted list of all pending part IDs with their types.
+func (hnq *handoffNodeQueue) listPending() ([]partTypePair, error) {
+       hnq.mu.RLock()
+       defer hnq.mu.RUnlock()
+
+       entries := hnq.fileSystem.ReadDir(hnq.root)
+       var pairs []partTypePair
+
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       continue
+               }
+
+               // Skip temp directories and metadata files
+               if filepath.Ext(entry.Name()) == ".tmp" || entry.Name() == 
nodeInfoFilename {
+                       continue
+               }
+
+               // Parse partId from hex directory name
+               partID, err := parsePartID(entry.Name())
+               if err != nil {
+                       hnq.l.Warn().Str("name", entry.Name()).Msg("invalid 
part directory")
+                       continue
+               }
+
+               // List part types under this partId
+               partIDDir := filepath.Join(hnq.root, entry.Name())
+               partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+               for _, partTypeEntry := range partTypeEntries {
+                       if partTypeEntry.IsDir() {
+                               pairs = append(pairs, partTypePair{
+                                       PartID:   partID,
+                                       PartType: partTypeEntry.Name(),
+                               })
+                       }
+               }
+       }
+
+       // Sort by partID first, then by partType
+       sort.Slice(pairs, func(i, j int) bool {
+               if pairs[i].PartID == pairs[j].PartID {
+                       return pairs[i].PartType < pairs[j].PartType
+               }
+               return pairs[i].PartID < pairs[j].PartID
+       })
+
+       return pairs, nil
+}
+
+// getMetadata reads the handoff metadata for a specific part type.
+func (hnq *handoffNodeQueue) getMetadata(partID uint64, partType string) 
(*handoffMetadata, error) {
+       hnq.mu.RLock()
+       defer hnq.mu.RUnlock()
+
+       metaPath := filepath.Join(hnq.getPartTypePath(partID, partType), 
handoffMetaFilename)
+       data, err := hnq.fileSystem.Read(metaPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to read metadata: %w", err)
+       }
+
+       var meta handoffMetadata
+       if err := json.Unmarshal(data, &meta); err != nil {
+               return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
+       }
+
+       return &meta, nil
+}
+
+// getPartIDDir returns the directory path for a partID (contains all part 
types).
+func (hnq *handoffNodeQueue) getPartIDDir(partID uint64) string {
+       return filepath.Join(hnq.root, partName(partID))
+}
+
+// getPartTypePath returns the full path to a specific part type directory.
+func (hnq *handoffNodeQueue) getPartTypePath(partID uint64, partType string) 
string {
+       return filepath.Join(hnq.getPartIDDir(partID), partType)
+}
+
+// complete removes a specific part type from the handoff queue after 
successful delivery.
+func (hnq *handoffNodeQueue) complete(partID uint64, partType string) error {
+       hnq.mu.Lock()
+       defer hnq.mu.Unlock()
+
+       dstPath := hnq.getPartTypePath(partID, partType)
+
+       // Remove the part type directory
+       hnq.fileSystem.MustRMAll(dstPath)
+
+       // Check if partID directory is now empty, if so remove it too
+       partIDDir := hnq.getPartIDDir(partID)
+       entries := hnq.fileSystem.ReadDir(partIDDir)
+       isEmpty := true
+       for _, entry := range entries {
+               if entry.IsDir() {
+                       isEmpty = false
+                       break
+               }
+       }
+       if isEmpty {
+               hnq.fileSystem.MustRMAll(partIDDir)
+       }
+
+       hnq.l.Info().
+               Str("node", hnq.nodeAddr).
+               Uint64("partId", partID).
+               Str("partType", partType).
+               Msg("part type removed from handoff queue")
+
+       return nil
+}
+
+// completeAll removes all part types for a given partID.
+func (hnq *handoffNodeQueue) completeAll(partID uint64) error {
+       hnq.mu.Lock()
+       defer hnq.mu.Unlock()
+
+       partIDDir := hnq.getPartIDDir(partID)
+
+       // Simply remove the entire partID directory
+       hnq.fileSystem.MustRMAll(partIDDir)
+
+       hnq.l.Info().
+               Str("node", hnq.nodeAddr).
+               Uint64("partId", partID).
+               Msg("all part types removed from handoff queue")
+
+       return nil
+}
+
+// size returns the total size of all pending parts in bytes.
+func (hnq *handoffNodeQueue) size() (uint64, error) {
+       hnq.mu.RLock()
+       defer hnq.mu.RUnlock()
+
+       partIDEntries := hnq.fileSystem.ReadDir(hnq.root)
+       var totalSize uint64
+
+       for _, partIDEntry := range partIDEntries {
+               if !partIDEntry.IsDir() || partIDEntry.Name() == 
nodeInfoFilename {
+                       continue
+               }
+
+               // For each partID directory
+               partIDDir := filepath.Join(hnq.root, partIDEntry.Name())
+               partTypeEntries := hnq.fileSystem.ReadDir(partIDDir)
+
+               for _, partTypeEntry := range partTypeEntries {
+                       if !partTypeEntry.IsDir() {
+                               continue
+                       }
+
+                       // For each part type directory
+                       partTypePath := filepath.Join(partIDDir, 
partTypeEntry.Name())
+                       partFiles := hnq.fileSystem.ReadDir(partTypePath)
+
+                       for _, partFile := range partFiles {
+                               if partFile.IsDir() {
+                                       continue
+                               }
+
+                               filePath := filepath.Join(partTypePath, 
partFile.Name())
+                               file, err := hnq.fileSystem.OpenFile(filePath)
+                               if err != nil {
+                                       continue // Skip files we can't open
+                               }
+                               size, _ := file.Size()
+                               fs.MustClose(file)
+                               if size > 0 {
+                                       totalSize += uint64(size)
+                               }
+                       }
+               }
+       }
+
+       return totalSize, nil
+}
+
+// copyPartDirectory creates hardlinks from srcDir to dstDir using 
fs.CreateHardLink.
+func copyPartDirectory(fileSystem fs.FileSystem, srcDir, dstDir string) error {
+       if err := fileSystem.CreateHardLink(srcDir, dstDir, nil); err != nil {
+               return fmt.Errorf("failed to create hardlinks from %s to %s: 
%w", srcDir, dstDir, err)
+       }
+       return nil
+}
+
+// parsePartID parses a part ID from a hex string directory name.
+func parsePartID(name string) (uint64, error) {
+       partID, err := strconv.ParseUint(name, 16, 64)
+       if err != nil {
+               return 0, fmt.Errorf("cannot parse part ID %s: %w", name, err)
+       }
+       return partID, nil
+}
diff --git a/banyand/trace/handoff_storage_test.go 
b/banyand/trace/handoff_storage_test.go
new file mode 100644
index 00000000..93084aff
--- /dev/null
+++ b/banyand/trace/handoff_storage_test.go
@@ -0,0 +1,692 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "encoding/json"
+       "errors"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+const megabyte = 1024 * 1024
+
+type fakeQueueClient struct {
+       healthy []string
+}
+
+func (f *fakeQueueClient) HealthyNodes() []string {
+       return append([]string(nil), f.healthy...)
+}
+
+func (f *fakeQueueClient) NewChunkedSyncClient(string, uint32) 
(queue.ChunkedSyncClient, error) {
+       return nil, errors.New("not implemented")
+}
+
+func setupHandoffTest(t *testing.T) (string, fs.FileSystem, *logger.Logger) {
+       tempDir, deferFn := test.Space(require.New(t))
+       t.Cleanup(deferFn)
+
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("handoff-test")
+
+       return tempDir, fileSystem, l
+}
+
+func createTestPart(t *testing.T, fileSystem fs.FileSystem, root string, 
partID uint64) string {
+       partPath := filepath.Join(root, partName(partID))
+       fileSystem.MkdirIfNotExist(partPath, storage.DirPerm)
+
+       // Create test files
+       files := map[string][]byte{
+               "meta.bin":       []byte("test meta data"),
+               "primary.bin":    []byte("test primary data"),
+               "spans.bin":      []byte("test spans data"),
+               "metadata.json":  []byte(`{"id":1,"totalCount":100}`),
+               "tag.type":       []byte("test tag type"),
+               "traceID.filter": []byte("test filter"),
+       }
+
+       for filename, content := range files {
+               filePath := filepath.Join(partPath, filename)
+               lf, err := fileSystem.CreateLockFile(filePath, storage.FilePerm)
+               require.NoError(t, err)
+               _, err = lf.Write(content)
+               require.NoError(t, err)
+       }
+
+       return partPath
+}
+
+func TestHandoffNodeQueue_EnqueueCore(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1a)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff node queue
+       queueRoot := filepath.Join(tempDir, "handoff", "node1")
+       nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912", 
queueRoot, fileSystem, l)
+       require.NoError(t, err)
+
+       // Test enqueue core part
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            "default",
+               ShardID:          0,
+               PartType:         PartTypeCore,
+       }
+
+       err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+       require.NoError(t, err)
+
+       // Verify nested structure: <partID>/core/
+       dstPath := nodeQueue.getPartTypePath(partID, PartTypeCore)
+       entries := fileSystem.ReadDir(dstPath)
+       assert.NotEmpty(t, entries)
+
+       // Verify metadata file exists
+       metaPath := filepath.Join(dstPath, handoffMetaFilename)
+       metaData, err := fileSystem.Read(metaPath)
+       require.NoError(t, err)
+       assert.NotEmpty(t, metaData)
+}
+
+func TestHandoffNodeQueue_EnqueueMultiplePartTypes(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source parts
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1b)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff node queue
+       queueRoot := filepath.Join(tempDir, "handoff", "node1")
+       nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912", 
queueRoot, fileSystem, l)
+       require.NoError(t, err)
+
+       // Enqueue core part
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            "default",
+               ShardID:          0,
+               PartType:         PartTypeCore,
+       }
+       err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+       require.NoError(t, err)
+
+       // Enqueue sidx part (simulated with same source)
+       meta.PartType = "sidx_trace_id"
+       err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+       require.NoError(t, err)
+
+       // Enqueue another sidx part
+       meta.PartType = "sidx_service"
+       err = nodeQueue.enqueue(partID, "sidx_service", sourcePath, meta)
+       require.NoError(t, err)
+
+       // List pending - should have 3 part types for same partID
+       pending, err := nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 3)
+
+       // Verify all belong to same partID
+       for _, pair := range pending {
+               assert.Equal(t, partID, pair.PartID)
+       }
+
+       // Verify part types
+       partTypes := make(map[string]bool)
+       for _, pair := range pending {
+               partTypes[pair.PartType] = true
+       }
+       assert.True(t, partTypes[PartTypeCore])
+       assert.True(t, partTypes["sidx_trace_id"])
+       assert.True(t, partTypes["sidx_service"])
+}
+
+func TestHandoffNodeQueue_Complete(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1c)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff node queue
+       queueRoot := filepath.Join(tempDir, "handoff", "node1")
+       nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912", 
queueRoot, fileSystem, l)
+       require.NoError(t, err)
+
+       // Enqueue core and sidx parts
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            "default",
+               ShardID:          0,
+               PartType:         PartTypeCore,
+       }
+       err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+       require.NoError(t, err)
+
+       meta.PartType = "sidx_trace_id"
+       err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+       require.NoError(t, err)
+
+       // Verify both are pending
+       pending, err := nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 2)
+
+       // Complete core part only
+       err = nodeQueue.complete(partID, PartTypeCore)
+       require.NoError(t, err)
+
+       // Verify only sidx remains
+       pending, err = nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 1)
+       assert.Equal(t, "sidx_trace_id", pending[0].PartType)
+
+       // Complete remaining part
+       err = nodeQueue.complete(partID, "sidx_trace_id")
+       require.NoError(t, err)
+
+       // Verify nothing pending
+       pending, err = nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 0)
+
+       // Verify partID directory is also removed
+       partIDDir := nodeQueue.getPartIDDir(partID)
+       _, err = os.Stat(partIDDir)
+       assert.True(t, os.IsNotExist(err))
+}
+
+func TestHandoffNodeQueue_CompleteAll(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1d)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff node queue
+       queueRoot := filepath.Join(tempDir, "handoff", "node1")
+       nodeQueue, err := newHandoffNodeQueue("node1.example.com:17912", 
queueRoot, fileSystem, l)
+       require.NoError(t, err)
+
+       // Enqueue multiple part types
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            "default",
+               ShardID:          0,
+               PartType:         PartTypeCore,
+       }
+       err = nodeQueue.enqueue(partID, PartTypeCore, sourcePath, meta)
+       require.NoError(t, err)
+       err = nodeQueue.enqueue(partID, "sidx_trace_id", sourcePath, meta)
+       require.NoError(t, err)
+       err = nodeQueue.enqueue(partID, "sidx_service", sourcePath, meta)
+       require.NoError(t, err)
+
+       // Verify all are pending
+       pending, err := nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 3)
+
+       // Complete all at once
+       err = nodeQueue.completeAll(partID)
+       require.NoError(t, err)
+
+       // Verify nothing pending
+       pending, err = nodeQueue.listPending()
+       require.NoError(t, err)
+       assert.Len(t, pending, 0)
+}
+
+func TestHandoffController_EnqueueForNodes(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1e)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff controller
+       offlineNodes := []string{"node1.example.com:17912", 
"node2.example.com:17912"}
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
offlineNodes, 0, l, nil)
+       require.NoError(t, err)
+
+       // Enqueue for multiple nodes
+       err = controller.enqueueForNodes(offlineNodes, partID, PartTypeCore, 
sourcePath, "default", 0)
+       require.NoError(t, err)
+
+       // Verify both nodes have the part
+       for _, nodeAddr := range offlineNodes {
+               pending, err := controller.listPendingForNode(nodeAddr)
+               require.NoError(t, err)
+               assert.Len(t, pending, 1)
+               assert.Equal(t, partID, pending[0].PartID)
+               assert.Equal(t, PartTypeCore, pending[0].PartType)
+       }
+}
+
+func TestHandoffController_GetPartPath(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x1f)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       // Create handoff controller
+       nodeAddr := "node1.example.com:17912"
+       controller, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+
+       // Enqueue for node
+       err = controller.enqueueForNode(nodeAddr, partID, PartTypeCore, 
sourcePath, "default", 0)
+       require.NoError(t, err)
+
+       // Get part path
+       partPath := controller.getPartPath(nodeAddr, partID, PartTypeCore)
+       assert.NotEmpty(t, partPath)
+
+       // Verify path exists
+       _, err = os.Stat(partPath)
+       require.NoError(t, err)
+
+       // Verify it's the nested structure
+       assert.Contains(t, partPath, partName(partID))
+       assert.Contains(t, partPath, PartTypeCore)
+}
+
+func TestHandoffController_LoadExistingQueues(t *testing.T) {
+       tempDir, fileSystem, l := setupHandoffTest(t)
+
+       // Create source part
+       sourceRoot := filepath.Join(tempDir, "source")
+       fileSystem.MkdirIfNotExist(sourceRoot, storage.DirPerm)
+       partID := uint64(0x20)
+       sourcePath := createTestPart(t, fileSystem, sourceRoot, partID)
+
+       nodeAddr := "node1.example.com:17912"
+
+       // Create first controller and enqueue part
+       controller1, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+
+       err = controller1.enqueueForNode(nodeAddr, partID, PartTypeCore, 
sourcePath, "default", 0)
+       require.NoError(t, err)
+
+       // Close first controller
+       err = controller1.close()
+       require.NoError(t, err)
+
+       // Create second controller (should load existing queues)
+       controller2, err := newHandoffController(fileSystem, tempDir, nil, 
[]string{nodeAddr}, 0, l, nil)
+       require.NoError(t, err)
+
+       // Verify part is still pending
+       pending, err := controller2.listPendingForNode(nodeAddr)
+       require.NoError(t, err)
+       assert.Len(t, pending, 1)
+       assert.Equal(t, partID, pending[0].PartID)
+       assert.Equal(t, PartTypeCore, pending[0].PartType)
+}
+
+func TestSanitizeNodeAddr(t *testing.T) {
+       tests := []struct {
+               input    string
+               expected string
+       }{
+               {"node1.example.com:17912", "node1.example.com_17912"},
+               {"node2:8080", "node2_8080"},
+               {"192.168.1.1:9999", "192.168.1.1_9999"},
+               {"node/with/slashes", "node_with_slashes"},
+               {"node\\with\\backslashes", "node_with_backslashes"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.input, func(t *testing.T) {
+                       result := sanitizeNodeAddr(tt.input)
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestParsePartID(t *testing.T) {
+       tests := []struct {
+               name      string
+               input     string
+               expected  uint64
+               expectErr bool
+       }{
+               {"valid hex", "000000000000001a", 0x1a, false},
+               {"valid hex upper", "000000000000001A", 0x1a, false},
+               {"simple hex", "1a", 0x1a, false},
+               {"invalid hex", "xyz", 0, true},
+               {"empty string", "", 0, true},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result, err := parsePartID(tt.input)
+                       if tt.expectErr {
+                               assert.Error(t, err)
+                       } else {
+                               assert.NoError(t, err)
+                               assert.Equal(t, tt.expected, result)
+                       }
+               })
+       }
+}
+
+// TestHandoffController_SizeEnforcement verifies that handoff queue rejects 
enqueues when size limit is exceeded.
+func TestHandoffController_SizeEnforcement(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       // Create a mock part with metadata
+       partID := uint64(100)
+       partPath := filepath.Join(tempDir, "source", partName(partID))
+       err := os.MkdirAll(partPath, 0o755)
+       tester.NoError(err)
+
+       // Create metadata.json with CompressedSizeBytes = 5MB
+       metadata := map[string]interface{}{
+               "compressedSizeBytes": 5 * 1024 * 1024, // 5MB
+       }
+       metadataBytes, err := json.Marshal(metadata)
+       tester.NoError(err)
+       err = os.WriteFile(filepath.Join(partPath, "metadata.json"), 
metadataBytes, 0o600)
+       tester.NoError(err)
+
+       // Create a dummy file
+       err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test 
data"), 0o600)
+       tester.NoError(err)
+
+       // Create handoff controller with 10MB limit
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912", "node2:17912"}
+       hc, err := newHandoffController(lfs, tempDir, nil, dataNodes, 
10*megabyte, l, nil) // 10MB limit
+       tester.NoError(err)
+       defer hc.close()
+
+       // First enqueue should succeed (5MB < 10MB)
+       err = hc.enqueueForNode("node1:17912", partID, PartTypeCore, partPath, 
"group1", 1)
+       tester.NoError(err)
+
+       // Check total size
+       totalSize := hc.getTotalSize()
+       tester.Equal(uint64(5*1024*1024), totalSize)
+
+       // Second enqueue should succeed (10MB = 10MB)
+       err = hc.enqueueForNode("node1:17912", partID+1, PartTypeCore, 
partPath, "group1", 1)
+       tester.NoError(err)
+
+       // Check total size
+       totalSize = hc.getTotalSize()
+       tester.Equal(uint64(10*1024*1024), totalSize)
+
+       // Third enqueue should fail (15MB > 10MB)
+       err = hc.enqueueForNode("node1:17912", partID+2, PartTypeCore, 
partPath, "group1", 1)
+       tester.Error(err)
+       tester.Contains(err.Error(), "handoff queue full")
+
+       // Total size should still be 10MB
+       totalSize = hc.getTotalSize()
+       tester.Equal(uint64(10*1024*1024), totalSize)
+}
+
+// TestHandoffController_SizeTracking verifies that total size is correctly 
updated on enqueue and complete.
+func TestHandoffController_SizeTracking(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       // Create a mock part with metadata
+       partID := uint64(200)
+       partPath := filepath.Join(tempDir, "source", partName(partID))
+       err := os.MkdirAll(partPath, 0o755)
+       tester.NoError(err)
+
+       // Create metadata.json with CompressedSizeBytes = 3MB
+       metadata := map[string]interface{}{
+               "compressedSizeBytes": 3 * 1024 * 1024, // 3MB
+       }
+       metadataBytes, err := json.Marshal(metadata)
+       tester.NoError(err)
+       err = os.WriteFile(filepath.Join(partPath, "metadata.json"), 
metadataBytes, 0o600)
+       tester.NoError(err)
+
+       // Create a dummy file
+       err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test 
data"), 0o600)
+       tester.NoError(err)
+
+       // Create handoff controller with 100MB limit
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912"}
+       hc, err := newHandoffController(lfs, tempDir, nil, dataNodes, 
100*megabyte, l, nil) // 100MB limit
+       tester.NoError(err)
+       defer hc.close()
+
+       // Initial size should be 0
+       tester.Equal(uint64(0), hc.getTotalSize())
+
+       // Enqueue first part
+       err = hc.enqueueForNode("node1:17912", partID, PartTypeCore, partPath, 
"group1", 1)
+       tester.NoError(err)
+       tester.Equal(uint64(3*1024*1024), hc.getTotalSize())
+
+       // Enqueue second part
+       err = hc.enqueueForNode("node1:17912", partID+1, PartTypeCore, 
partPath, "group1", 1)
+       tester.NoError(err)
+       tester.Equal(uint64(6*1024*1024), hc.getTotalSize())
+
+       // Complete first part
+       err = hc.completeSend("node1:17912", partID, PartTypeCore)
+       tester.NoError(err)
+       tester.Equal(uint64(3*1024*1024), hc.getTotalSize())
+
+       // Complete second part
+       err = hc.completeSend("node1:17912", partID+1, PartTypeCore)
+       tester.NoError(err)
+       tester.Equal(uint64(0), hc.getTotalSize())
+}
+
+func TestHandoffController_NodeQueueHelpers(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       const (
+               partID = uint64(400)
+               node   = "node1:17912"
+       )
+
+       partPath := filepath.Join(tempDir, "source", partName(partID))
+       err := os.MkdirAll(partPath, 0o755)
+       tester.NoError(err)
+
+       metadata := map[string]interface{}{
+               "compressedSizeBytes": 1024,
+       }
+       metadataBytes, err := json.Marshal(metadata)
+       tester.NoError(err)
+       err = os.WriteFile(filepath.Join(partPath, "metadata.json"), 
metadataBytes, 0o600)
+       tester.NoError(err)
+       err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("helper 
test"), 0o600)
+       tester.NoError(err)
+
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       hc, err := newHandoffController(lfs, tempDir, nil, []string{node}, 
10*megabyte, l, nil)
+       tester.NoError(err)
+       defer hc.close()
+
+       err = hc.enqueueForNode(node, partID, PartTypeCore, partPath, "group1", 
1)
+       tester.NoError(err)
+
+       nodes := hc.getAllNodeQueues()
+       tester.Contains(nodes, node)
+
+       queueSize, err := hc.getNodeQueueSize(node)
+       tester.NoError(err)
+       tester.Greater(queueSize, uint64(0))
+
+       err = hc.completeSendAll(node, partID)
+       tester.NoError(err)
+
+       queueSize, err = hc.getNodeQueueSize(node)
+       tester.NoError(err)
+       tester.Equal(uint64(0), queueSize)
+}
+
+func TestHandoffController_FiltersNonOwningOfflineNodes(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912", "node2:17912"}
+       const groupName = "group1"
+       const shardID uint32 = 0
+
+       resolver := func(group string, shard uint32) ([]string, error) {
+               tester.Equal(groupName, group)
+               tester.Equal(shardID, shard)
+               return []string{"node2:17912"}, nil
+       }
+
+       queueClient := &fakeQueueClient{healthy: dataNodes}
+       hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes, 
0, l, resolver)
+       tester.NoError(err)
+       defer hc.close()
+
+       offline := hc.calculateOfflineNodes([]string{"node2:17912"}, groupName, 
common.ShardID(shardID))
+       tester.Len(offline, 0, "expected no offline nodes when shard owner is 
online")
+
+       partDir := filepath.Join(tempDir, "part-core")
+       tester.NoError(os.MkdirAll(partDir, 0o755))
+
+       coreParts := []partInfo{{
+               partID:  uint64(1),
+               path:    partDir,
+               group:   groupName,
+               shardID: common.ShardID(shardID),
+       }}
+
+       err = hc.enqueueForOfflineNodes([]string{"node1:17912"}, coreParts, nil)
+       tester.NoError(err)
+
+       nodeDir := filepath.Join(hc.root, sanitizeNodeAddr("node1:17912"))
+       _, statErr := os.Stat(nodeDir)
+       tester.Error(statErr)
+       tester.True(os.IsNotExist(statErr), "expected no queue directory to be 
created for non-owning node")
+}
+
+// TestHandoffController_SizeRecovery verifies that total size is correctly 
calculated on restart.
+func TestHandoffController_SizeRecovery(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       // Create a mock part with metadata
+       partID := uint64(300)
+       partPath := filepath.Join(tempDir, "source", partName(partID))
+       err := os.MkdirAll(partPath, 0o755)
+       tester.NoError(err)
+
+       // Create metadata.json with CompressedSizeBytes = 7MB
+       metadata := map[string]interface{}{
+               "compressedSizeBytes": 7 * 1024 * 1024, // 7MB
+       }
+       metadataBytes, err := json.Marshal(metadata)
+       tester.NoError(err)
+       err = os.WriteFile(filepath.Join(partPath, "metadata.json"), 
metadataBytes, 0o600)
+       tester.NoError(err)
+
+       // Create a dummy file
+       err = os.WriteFile(filepath.Join(partPath, "data.bin"), []byte("test 
data"), 0o600)
+       tester.NoError(err)
+
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912", "node2:17912"}
+
+       // First controller: enqueue some parts
+       hc1, err := newHandoffController(lfs, tempDir, nil, dataNodes, 
100*megabyte, l, nil)
+       tester.NoError(err)
+
+       err = hc1.enqueueForNode("node1:17912", partID, PartTypeCore, partPath, 
"group1", 1)
+       tester.NoError(err)
+       err = hc1.enqueueForNode("node2:17912", partID+1, PartTypeCore, 
partPath, "group1", 1)
+       tester.NoError(err)
+
+       // Total size should be 14MB (7MB * 2 parts)
+       expectedSize := uint64(14 * 1024 * 1024)
+       tester.Equal(expectedSize, hc1.getTotalSize())
+
+       // Close first controller
+       err = hc1.close()
+       tester.NoError(err)
+
+       // Second controller: should recover the same size
+       hc2, err := newHandoffController(lfs, tempDir, nil, dataNodes, 
100*megabyte, l, nil)
+       tester.NoError(err)
+       defer hc2.close()
+
+       // Recovered size should match
+       recoveredSize := hc2.getTotalSize()
+       tester.Equal(expectedSize, recoveredSize)
+
+       // Should have both nodes with pending parts
+       node1Pending, err := hc2.listPendingForNode("node1:17912")
+       tester.NoError(err)
+       tester.Len(node1Pending, 1)
+
+       node2Pending, err := hc2.listPendingForNode("node2:17912")
+       tester.NoError(err)
+       tester.Len(node2Pending, 1)
+}
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 9849d81f..cf435393 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -375,6 +376,7 @@ type queueSupplier struct {
        traceDataNodeRegistry grpc.NodeRegistry
        l                     *logger.Logger
        schemaRepo            *schemaRepo
+       handoffCtrl           *handoffController
        path                  string
        option                option
 }
@@ -398,6 +400,7 @@ func newQueueSupplier(path string, svc *liaison, 
traceDataNodeRegistry grpc.Node
                path:                  path,
                option:                opt,
                schemaRepo:            &svc.schemaRepo,
+               handoffCtrl:           svc.handoffCtrl,
        }
 }
 
@@ -431,7 +434,11 @@ func (qs *queueSupplier) OpenDB(groupSchema 
*commonv1.Group) (resourceSchema.DB,
                Location:        path.Join(qs.path, group),
                Option:          qs.option,
                Metrics:         qs.newMetrics(p),
-               SubQueueCreator: newWriteQueue,
+               SubQueueCreator: func(fileSystem fs.FileSystem, root string, 
position common.Position,
+                       l *logger.Logger, option option, metrics any, group 
string, shardID common.ShardID, getNodes func() []string,
+               ) (*tsTable, error) {
+                       return newWriteQueue(fileSystem, root, position, l, 
option, metrics, group, shardID, getNodes, qs.handoffCtrl)
+               },
                GetNodes: func(shardID common.ShardID) []string {
                        copies := ro.Replicas + 1
                        nodeSet := make(map[string]struct{}, copies)
diff --git a/banyand/trace/streaming_pipeline_test.go 
b/banyand/trace/streaming_pipeline_test.go
index f361ded5..a36ae9e5 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -79,7 +79,8 @@ func (f *fakeSIDX) Merge(<-chan struct{}, 
map[uint64]struct{}, uint64) (*sidx.Me
 func (f *fakeSIDX) StreamingParts(map[uint64]struct{}, string, uint32, string) 
([]queue.StreamingPartData, []func()) {
        panic("not implemented")
 }
-func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return func() 
{} }
+func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return 
map[uint64]string{} }
+func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func()      { return 
func() {} }
 
 type fakeSIDXWithErr struct {
        *fakeSIDX
@@ -647,6 +648,10 @@ func (f *fakeSIDXInfinite) Merge(<-chan struct{}, 
map[uint64]struct{}, uint64) (
 func (f *fakeSIDXInfinite) StreamingParts(map[uint64]struct{}, string, uint32, 
string) ([]queue.StreamingPartData, []func()) {
        panic("not implemented")
 }
+
+func (f *fakeSIDXInfinite) PartPaths(map[uint64]struct{}) map[uint64]string {
+       return map[uint64]string{}
+}
 func (f *fakeSIDXInfinite) IntroduceSynced(map[uint64]struct{}) func() { 
return func() {} }
 
 // TestStreamSIDXTraceBatches_InfiniteChannelContinuesUntilCanceled verifies 
that
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index b503db77..0a5de970 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -20,8 +20,10 @@ package trace
 import (
        "context"
        "errors"
+       "fmt"
        "path"
        "path/filepath"
+       "sort"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -44,19 +46,22 @@ import (
 )
 
 type liaison struct {
-       pm                  protector.Memory
-       metadata            metadata.Repo
-       pipeline            queue.Server
-       omr                 observability.MetricsRegistry
-       lfs                 fs.FileSystem
-       writeListener       bus.MessageListener
-       dataNodeSelector    node.Selector
-       l                   *logger.Logger
-       schemaRepo          schemaRepo
-       dataPath            string
-       root                string
-       option              option
-       maxDiskUsagePercent int
+       metadata              metadata.Repo
+       pipeline              queue.Server
+       omr                   observability.MetricsRegistry
+       lfs                   fs.FileSystem
+       writeListener         bus.MessageListener
+       dataNodeSelector      node.Selector
+       pm                    protector.Memory
+       handoffCtrl           *handoffController
+       l                     *logger.Logger
+       schemaRepo            schemaRepo
+       dataPath              string
+       root                  string
+       dataNodeList          []string
+       option                option
+       maxDiskUsagePercent   int
+       handoffMaxSizePercent int
 }
 
 var _ Service = (*liaison)(nil)
@@ -89,6 +94,12 @@ func (l *liaison) FlagSet() *run.FlagSet {
        fs.DurationVar(&l.option.flushTimeout, "trace-flush-timeout", 
3*time.Second, "the timeout for trace data flush")
        fs.IntVar(&l.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95, 
"the maximum disk usage percentage")
        fs.DurationVar(&l.option.syncInterval, "trace-sync-interval", 
defaultSyncInterval, "the periodic sync interval for trace data")
+       fs.StringSliceVar(&l.dataNodeList, "data-node-list", nil, 
"comma-separated list of data node names to monitor for handoff")
+       fs.IntVar(&l.handoffMaxSizePercent, "handoff-max-size-percent", 10,
+               "percentage of BanyanDB's allowed disk usage allocated to 
handoff storage. "+
+                       "Calculated as: totalDisk * 
trace-max-disk-usage-percent * handoff-max-size-percent / 10000. "+
+                       "Example: 100GB disk with 95% max usage and 10% handoff 
= 9.5GB; 50% handoff = 47.5GB. "+
+                       "Valid range: 0-100")
        return fs
 }
 
@@ -96,6 +107,17 @@ func (l *liaison) Validate() error {
        if l.root == "" {
                return errEmptyRootPath
        }
+
+       // Validate handoff-max-size-percent is within valid range
+       // Since handoffMaxSizePercent represents the percentage of BanyanDB's 
allowed disk usage
+       // that goes to handoff storage, it should be between 0-100%
+       if l.handoffMaxSizePercent < 0 || l.handoffMaxSizePercent > 100 {
+               return fmt.Errorf("invalid handoff-max-size-percent: %d%%. Must 
be between 0 and 100. "+
+                       "This represents what percentage of BanyanDB's allowed 
disk usage is allocated to handoff storage. "+
+                       "Example: 100GB disk with 95%% max usage and 50%% 
handoff = 100 * 95%% * 50%% = 47.5GB for handoff",
+                       l.handoffMaxSizePercent)
+       }
+
        return nil
 }
 
@@ -122,6 +144,69 @@ func (l *liaison) PreRun(ctx context.Context) error {
        }
 
        traceDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client, 
l.dataNodeSelector)
+       // Initialize handoff controller if data nodes are configured
+       l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent", 
l.handoffMaxSizePercent).
+               Msg("handoff configuration")
+       if len(l.dataNodeList) > 0 && l.option.tire2Client != nil {
+               // Calculate max handoff size based on percentage of disk space
+               // Formula: totalDisk * maxDiskUsagePercent * 
handoffMaxSizePercent / 10000
+               // Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 * 
10 / 10000 = 9.5GB
+               maxSize := 0
+               if l.handoffMaxSizePercent > 0 {
+                       l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
+                       totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+                       // Divide after each multiplication to avoid overflow 
with large disk capacities
+                       maxSizeBytes := totalSpace * 
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
+                       maxSize = int(maxSizeBytes / 1024 / 1024)
+               }
+
+               // nolint:contextcheck
+               resolveAssignments := func(group string, shardID uint32) 
([]string, error) {
+                       if l.metadata == nil {
+                               return nil, fmt.Errorf("metadata repo is not 
initialized")
+                       }
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       defer cancel()
+                       groupSchema, err := 
l.metadata.GroupRegistry().GetGroup(ctx, group)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if groupSchema == nil || groupSchema.ResourceOpts == 
nil {
+                               return nil, fmt.Errorf("group %s missing 
resource options", group)
+                       }
+                       copies := groupSchema.ResourceOpts.Replicas + 1
+                       if len(l.dataNodeList) == 0 {
+                               return nil, fmt.Errorf("no data nodes 
configured for handoff")
+                       }
+                       sortedNodes := append([]string(nil), l.dataNodeList...)
+                       sort.Strings(sortedNodes)
+                       nodes := make([]string, 0, copies)
+                       seen := make(map[string]struct{}, copies)
+                       for replica := uint32(0); replica < copies; replica++ {
+                               nodeID := 
sortedNodes[(int(shardID)+int(replica))%len(sortedNodes)]
+                               if _, ok := seen[nodeID]; ok {
+                                       continue
+                               }
+                               nodes = append(nodes, nodeID)
+                               seen[nodeID] = struct{}{}
+                       }
+                       return nodes, nil
+               }
+
+               var err error
+               // nolint:contextcheck
+               l.handoffCtrl, err = newHandoffController(l.lfs, l.dataPath, 
l.option.tire2Client, l.dataNodeList, maxSize, l.l, resolveAssignments)
+               if err != nil {
+                       return err
+               }
+               l.l.Info().
+                       Int("dataNodes", len(l.dataNodeList)).
+                       Int("maxSize", maxSize).
+                       Int("maxSizePercent", l.handoffMaxSizePercent).
+                       Int("diskUsagePercent", l.maxDiskUsagePercent).
+                       Msg("handoff controller initialized")
+       }
+
        l.schemaRepo = newLiaisonSchemaRepo(l.dataPath, l, 
traceDataNodeRegistry)
        l.writeListener = setUpWriteQueueCallback(l.l, &l.schemaRepo, 
l.maxDiskUsagePercent, l.option.tire2Client)
 
@@ -140,6 +225,11 @@ func (l *liaison) Serve() run.StopNotify {
 }
 
 func (l *liaison) GracefulStop() {
+       if l.handoffCtrl != nil {
+               if err := l.handoffCtrl.close(); err != nil {
+                       l.l.Warn().Err(err).Msg("failed to close handoff 
controller")
+               }
+       }
        if l.schemaRepo.Repository != nil {
                l.schemaRepo.Repository.Close()
        }
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 8dd8fea7..02d3ded7 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -294,6 +294,10 @@ func (tst *tsTable) executeSyncOperation(partsToSync 
[]*part, partIDsToSync map[
                        return err
                }
        }
+
+       // After sync attempts, enqueue parts for offline nodes
+       tst.enqueueForOfflineNodes(nodes, partsToSync, partIDsToSync)
+
        return nil
 }
 
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 4a9645eb..2d83dc25 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -56,6 +56,7 @@ type tsTable struct {
        snapshot      *snapshot
        loopCloser    *run.Closer
        getNodes      func() []string
+       handoffCtrl   *handoffController
        option        option
        introductions chan *introduction
        p             common.Position
@@ -337,6 +338,62 @@ func (tst *tsTable) getAllSidx() map[string]sidx.SIDX {
        return result
 }
 
+// enqueueForOfflineNodes enqueues parts for offline nodes via the handoff 
controller.
+func (tst *tsTable) enqueueForOfflineNodes(onlineNodes []string, partsToSync 
[]*part, partIDsToSync map[uint64]struct{}) {
+       if tst.handoffCtrl == nil {
+               return
+       }
+
+       // Check if there are any offline nodes before doing expensive 
preparation work
+       offlineNodes := tst.handoffCtrl.calculateOfflineNodes(onlineNodes, 
tst.group, tst.shardID)
+       tst.l.Debug().
+               Str("group", tst.group).
+               Uint32("shardID", uint32(tst.shardID)).
+               Strs("onlineNodes", onlineNodes).
+               Strs("offlineNodes", offlineNodes).
+               Msg("handoff enqueue evaluation")
+       if len(offlineNodes) == 0 {
+               return
+       }
+
+       // Prepare core parts info
+       coreParts := make([]partInfo, 0, len(partsToSync))
+       for _, part := range partsToSync {
+               coreParts = append(coreParts, partInfo{
+                       partID:  part.partMetadata.ID,
+                       path:    part.path,
+                       group:   tst.group,
+                       shardID: tst.shardID,
+               })
+       }
+
+       // Get sidx part paths from each sidx instance
+       sidxMap := tst.getAllSidx()
+       sidxParts := make(map[string][]partInfo)
+       for sidxName, sidxInstance := range sidxMap {
+               partPaths := sidxInstance.PartPaths(partIDsToSync)
+               if len(partPaths) == 0 {
+                       continue
+               }
+
+               parts := make([]partInfo, 0, len(partPaths))
+               for partID, path := range partPaths {
+                       parts = append(parts, partInfo{
+                               partID:  partID,
+                               path:    path,
+                               group:   tst.group,
+                               shardID: tst.shardID,
+                       })
+               }
+               sidxParts[sidxName] = parts
+       }
+
+       // Call handoff controller with offline nodes
+       if err := tst.handoffCtrl.enqueueForOfflineNodes(offlineNodes, 
coreParts, sidxParts); err != nil {
+               tst.l.Warn().Err(err).Msg("handoff enqueue completed with 
errors")
+       }
+}
+
 func (tst *tsTable) loadSidxMap(availablePartIDs []uint64) {
        tst.sidxMap = make(map[string]sidx.SIDX)
        sidxRootPath := filepath.Join(tst.root, sidxDirName)
diff --git a/banyand/trace/wqueue.go b/banyand/trace/wqueue.go
index ef5584dd..bbc5bcfe 100644
--- a/banyand/trace/wqueue.go
+++ b/banyand/trace/wqueue.go
@@ -34,11 +34,13 @@ type NodeSelector interface {
 // newWriteQueue is like newTSTable but does not start the merge loop (or any 
background loops).
 func newWriteQueue(fileSystem fs.FileSystem, rootPath string, p 
common.Position,
        l *logger.Logger, option option, m any, group string, shardID 
common.ShardID, getNodes func() []string,
+       handoffCtrl *handoffController,
 ) (*tsTable, error) {
        t, epoch := initTSTable(fileSystem, rootPath, p, l, option, m)
        t.getNodes = getNodes
        t.group = group
        t.shardID = shardID
+       t.handoffCtrl = handoffCtrl
        t.startLoopWithConditionalMerge(epoch)
        return t, nil
 }
diff --git a/banyand/trace/wqueue_test.go b/banyand/trace/wqueue_test.go
index 02c20d2a..b93f612e 100644
--- a/banyand/trace/wqueue_test.go
+++ b/banyand/trace/wqueue_test.go
@@ -74,6 +74,7 @@ func Test_newWriteQueue(t *testing.T) {
                "test-group",
                common.ShardID(1),
                func() []string { return []string{"node1", "node2"} },
+               nil, // handoffCtrl
        )
        require.NoError(t, err)
        require.NotNil(t, tst)
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 154c7943..57d46083 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -120,6 +120,8 @@ type FileSystem interface {
        SyncPath(path string)
        // MustGetFreeSpace returns the free space of the file system.
        MustGetFreeSpace(path string) uint64
+       // MustGetTotalSpace returns the total space of the file system.
+       MustGetTotalSpace(path string) uint64
        // CreateHardLink creates hard links in destPath for files in srcPath 
that pass the filter.
        CreateHardLink(srcPath, destPath string, filter func(string) bool) error
 }
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 49f24f7b..451f46c1 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -301,6 +301,14 @@ func (fs *localFileSystem) MustGetFreeSpace(path string) 
uint64 {
        return usage.Free
 }
 
+func (fs *localFileSystem) MustGetTotalSpace(path string) uint64 {
+       usage, err := disk.Usage(path)
+       if err != nil {
+               fs.logger.Panic().Str("path", path).Err(err).Msg("failed to get 
disk usage")
+       }
+       return usage.Total
+}
+
 func (fs *localFileSystem) CreateHardLink(srcPath, destPath string, filter 
func(string) bool) error {
        fi, err := os.Stat(srcPath)
        if err != nil {
diff --git a/test/integration/handoff/handoff_suite_test.go 
b/test/integration/handoff/handoff_suite_test.go
new file mode 100644
index 00000000..988720ca
--- /dev/null
+++ b/test/integration/handoff/handoff_suite_test.go
@@ -0,0 +1,584 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handoff_test
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "hash/fnv"
+       "io"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+const (
+       nodeHost        = "127.0.0.1"
+       grpcHost        = "127.0.0.1"
+       traceShardCount = 2
+)
+
+type dataNodeHandle struct {
+       closeFn    func()
+       dataDir    string
+       addr       string
+       grpcPort   int
+       gossipPort int
+}
+
+func newDataNodeHandle(dataDir string, grpcPort, gossipPort int) 
*dataNodeHandle {
+       return &dataNodeHandle{
+               dataDir:    dataDir,
+               grpcPort:   grpcPort,
+               gossipPort: gossipPort,
+               addr:       fmt.Sprintf("%s:%d", nodeHost, grpcPort),
+       }
+}
+
+func (h *dataNodeHandle) start(etcdEndpoint string) {
+       Expect(h.closeFn).To(BeNil())
+       args := []string{
+               "data",
+               "--grpc-host=" + grpcHost,
+               fmt.Sprintf("--grpc-port=%d", h.grpcPort),
+               fmt.Sprintf("--property-repair-gossip-grpc-port=%d", 
h.gossipPort),
+               "--stream-root-path=" + h.dataDir,
+               "--measure-root-path=" + h.dataDir,
+               "--property-root-path=" + h.dataDir,
+               "--trace-root-path=" + h.dataDir,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost,
+               "--node-labels", "type=handoff",
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
+               "--measure-flush-timeout=0s",
+               "--stream-flush-timeout=0s",
+               "--trace-flush-timeout=0s",
+       }
+       h.closeFn = setup.CMD(args...)
+
+       Eventually(helpers.HealthCheck(fmt.Sprintf("%s:%d", grpcHost, 
h.grpcPort), 10*time.Second, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials())), 
flags.EventuallyTimeout).Should(Succeed())
+
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (h *dataNodeHandle) stop(etcdEndpoint string) {
+       if h.closeFn != nil {
+               h.closeFn()
+               h.closeFn = nil
+       }
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (h *dataNodeHandle) etcdKey() string {
+       return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, 
nodeHost, h.grpcPort)
+}
+
+type liaisonHandle struct {
+       closeFn    func()
+       rootPath   string
+       addr       string
+       httpAddr   string
+       grpcPort   int
+       httpPort   int
+       serverPort int
+}
+
+func newLiaisonHandle(rootPath string, grpcPort, httpPort, serverPort int) 
*liaisonHandle {
+       return &liaisonHandle{
+               rootPath:   rootPath,
+               grpcPort:   grpcPort,
+               httpPort:   httpPort,
+               serverPort: serverPort,
+               addr:       fmt.Sprintf("%s:%d", grpcHost, grpcPort),
+               httpAddr:   fmt.Sprintf("%s:%d", grpcHost, httpPort),
+       }
+}
+
+func (l *liaisonHandle) start(etcdEndpoint string, dataNodes []string) {
+       Expect(l.closeFn).To(BeNil())
+       joined := strings.Join(dataNodes, ",")
+       Expect(os.Setenv("BYDB_DATA_NODE_LIST", joined)).To(Succeed())
+       Expect(os.Setenv("BYDB_DATA_NODE_SELECTOR", 
"type=handoff")).To(Succeed())
+       args := []string{
+               "liaison",
+               "--grpc-host=" + grpcHost,
+               fmt.Sprintf("--grpc-port=%d", l.grpcPort),
+               "--http-host=" + grpcHost,
+               fmt.Sprintf("--http-port=%d", l.httpPort),
+               "--liaison-server-grpc-host=" + grpcHost,
+               fmt.Sprintf("--liaison-server-grpc-port=%d", l.serverPort),
+               "--http-grpc-addr=" + l.addr,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost,
+               "--stream-root-path=" + l.rootPath,
+               "--measure-root-path=" + l.rootPath,
+               "--trace-root-path=" + l.rootPath,
+               "--stream-flush-timeout=500ms",
+               "--measure-flush-timeout=500ms",
+               "--trace-flush-timeout=500ms",
+               "--stream-sync-interval=1s",
+               "--measure-sync-interval=1s",
+               "--trace-sync-interval=1s",
+               "--handoff-max-size-percent=100",
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
+       }
+
+       l.closeFn = setup.CMD(args...)
+
+       Eventually(helpers.HTTPHealthCheck(l.httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (l *liaisonHandle) stop(etcdEndpoint string) {
+       if l.closeFn != nil {
+               l.closeFn()
+               l.closeFn = nil
+       }
+       _ = os.Unsetenv("BYDB_DATA_NODE_LIST")
+       _ = os.Unsetenv("BYDB_DATA_NODE_SELECTOR")
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (l *liaisonHandle) etcdKey() string {
+       return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, 
nodeHost, l.serverPort)
+}
+
+type suiteInfo struct {
+       LiaisonAddr  string   `json:"liaison_addr"`
+       EtcdEndpoint string   `json:"etcd_endpoint"`
+       HandoffRoot  string   `json:"handoff_root"`
+       DataNodes    []string `json:"data_nodes"`
+}
+
+func keysToSet[K comparable, V any](m map[K]V) map[K]struct{} {
+       if m == nil {
+               return nil
+       }
+       out := make(map[K]struct{}, len(m))
+       for k := range m {
+               out[k] = struct{}{}
+       }
+       return out
+}
+
+var (
+       connection   *grpc.ClientConn
+       goods        []gleak.Goroutine
+       cleanupFunc  func()
+       handoffRoot  string
+       etcdEndpoint string
+
+       dnHandles [2]*dataNodeHandle
+       liaison   *liaisonHandle
+
+       etcdServer   embeddedetcd.Server
+       etcdSpaceDef func()
+       liaisonDef   func()
+       dataDefs     []func()
+)
+
+func TestHandoff(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Trace Handoff Suite")
+}
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{Env: "dev", Level: 
flags.LogLevel})).To(Succeed())
+       goods = gleak.Goroutines()
+
+       etcdPorts, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+
+       var etcdDir string
+       etcdDir, etcdSpaceDef, err = test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+
+       clientEP := fmt.Sprintf("http://%s:%d";, nodeHost, etcdPorts[0])
+       peerEP := fmt.Sprintf("http://%s:%d";, nodeHost, etcdPorts[1])
+
+       etcdServer, err = embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{clientEP}, 
[]string{peerEP}),
+               embeddedetcd.RootDir(etcdDir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       <-etcdServer.ReadyNotify()
+       etcdEndpoint = clientEP
+
+       registry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{clientEP}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer registry.Close()
+       Expect(testtrace.PreloadSchema(context.Background(), 
registry)).To(Succeed())
+
+       dataDefs = make([]func(), 0, 2)
+       for i := range dnHandles {
+               dataDir, def, errDir := test.NewSpace()
+               Expect(errDir).NotTo(HaveOccurred())
+               dataDefs = append(dataDefs, def)
+               ports, errPorts := test.AllocateFreePorts(2)
+               Expect(errPorts).NotTo(HaveOccurred())
+               dnHandles[i] = newDataNodeHandle(dataDir, ports[0], ports[1])
+               dnHandles[i].start(etcdEndpoint)
+       }
+
+       liaisonPath, def, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       liaisonDef = def
+       liaisonPorts, err := test.AllocateFreePorts(3)
+       Expect(err).NotTo(HaveOccurred())
+       liaison = newLiaisonHandle(liaisonPath, liaisonPorts[0], 
liaisonPorts[1], liaisonPorts[2])
+       nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+       liaison.start(etcdEndpoint, nodeAddrs)
+
+       handoffRoot = filepath.Join(liaisonPath, "trace", "data", "handoff", 
"nodes")
+
+       cleanupFunc = func() {
+               if liaison != nil {
+                       liaison.stop(etcdEndpoint)
+               }
+               for i := range dnHandles {
+                       if dnHandles[i] != nil {
+                               dnHandles[i].stop(etcdEndpoint)
+                       }
+               }
+               if etcdServer != nil {
+                       _ = etcdServer.Close()
+               }
+               if liaisonDef != nil {
+                       liaisonDef()
+               }
+               for _, def := range dataDefs {
+                       if def != nil {
+                               def()
+                       }
+               }
+               if etcdSpaceDef != nil {
+                       etcdSpaceDef()
+               }
+       }
+
+       info := suiteInfo{
+               LiaisonAddr:  liaison.addr,
+               EtcdEndpoint: etcdEndpoint,
+               HandoffRoot:  handoffRoot,
+               DataNodes:    nodeAddrs,
+       }
+       payload, err := json.Marshal(info)
+       Expect(err).NotTo(HaveOccurred())
+       return payload
+}, func(data []byte) {
+       var info suiteInfo
+       Expect(json.Unmarshal(data, &info)).To(Succeed())
+       etcdEndpoint = info.EtcdEndpoint
+       handoffRoot = info.HandoffRoot
+       if liaison == nil {
+               liaison = &liaisonHandle{addr: info.LiaisonAddr}
+       } else {
+               liaison.addr = info.LiaisonAddr
+       }
+
+       var err error
+       connection, err = grpchelper.Conn(info.LiaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Trace Handoff Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if cleanupFunc != nil {
+                       cleanupFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       }
+})
+
+var _ = Describe("trace handoff", func() {
+       It("replays data to recovered nodes and empties the queue", func() {
+               Expect(connection).NotTo(BeNil())
+
+               nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+               var (
+                       targetIndex = -1
+                       traceID     string
+                       writeTime   time.Time
+                       targetShard uint32
+               )
+
+               for idx := range dnHandles {
+                       candidateShard := shardIDForNode(dnHandles[idx].addr, 
nodeAddrs)
+                       candidateTraceID := 
generateTraceIDForShard(candidateShard, traceShardCount)
+
+                       By(fmt.Sprintf("ensuring the handoff queue for %s 
starts empty", dnHandles[idx].addr))
+                       
Expect(countPendingParts(dnHandles[idx].addr)).To(Equal(0))
+
+                       By(fmt.Sprintf("stopping %s for shard %d", 
dnHandles[idx].addr, candidateShard))
+                       dnHandles[idx].stop(etcdEndpoint)
+                       time.Sleep(7 * time.Second)
+
+                       candidateWriteTime := writeTrace(connection, 
candidateTraceID)
+                       found := waitForPendingParts(dnHandles[idx].addr, 
flags.EventuallyTimeout)
+                       if found {
+                               targetIndex = idx
+                               traceID = candidateTraceID
+                               writeTime = candidateWriteTime
+                               targetShard = candidateShard
+                               break
+                       }
+
+                       By(fmt.Sprintf("no handoff data detected for %s, 
bringing node back online", dnHandles[idx].addr))
+                       dnHandles[idx].start(etcdEndpoint)
+                       Eventually(func() int {
+                               return countPendingParts(dnHandles[idx].addr)
+                       }, flags.EventuallyTimeout).Should(Equal(0))
+               }
+
+               Expect(targetIndex).NotTo(Equal(-1), "expected to identify a 
shard owner for handoff")
+               targetAddr := dnHandles[targetIndex].addr
+
+               By("restarting the offline node after queueing data")
+               dnHandles[targetIndex].start(etcdEndpoint)
+
+               By("waiting for the queue to drain")
+               Eventually(func() int {
+                       return countPendingParts(targetAddr)
+               }, flags.EventuallyTimeout).Should(Equal(0))
+
+               By("verifying the replayed trace can be queried")
+               Eventually(func() error {
+                       return queryTrace(connection, traceID, writeTime)
+               }, flags.EventuallyTimeout).Should(Succeed())
+
+               var otherIndex int
+               for idx := range dnHandles {
+                       if idx != targetIndex {
+                               otherIndex = idx
+                               break
+                       }
+               }
+
+               By("stopping the other node to ensure the trace resides on the 
recovered node")
+               dnHandles[otherIndex].stop(etcdEndpoint)
+               defer dnHandles[otherIndex].start(etcdEndpoint)
+
+               Eventually(func() error {
+                       return queryTrace(connection, traceID, writeTime)
+               }, flags.EventuallyTimeout).Should(Succeed())
+
+               By(fmt.Sprintf("verified handoff for shard %d via node %s", 
targetShard, targetAddr))
+       })
+})
+
+func writeTrace(conn *grpc.ClientConn, traceID string) time.Time {
+       client := tracev1.NewTraceServiceClient(conn)
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
+
+       stream, err := client.Write(ctx)
+       Expect(err).NotTo(HaveOccurred())
+
+       baseTime := time.Now().UTC().Truncate(time.Millisecond)
+
+       tags := []*modelv1.TagValue{
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
traceID}}},
+               {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 1}}},
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"handoff_service"}}},
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"handoff_instance"}}},
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"/handoff"}}},
+               {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 321}}},
+               {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "span-" 
+ traceID}}},
+               {Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(baseTime)}},
+       }
+
+       err = stream.Send(&tracev1.WriteRequest{
+               Metadata: &commonv1.Metadata{Name: "sw", Group: 
"test-trace-group"},
+               Tags:     tags,
+               Span:     []byte("span-" + traceID),
+               Version:  1,
+       })
+       Expect(err).NotTo(HaveOccurred())
+       Expect(stream.CloseSend()).To(Succeed())
+
+       Eventually(func() error {
+               _, err = stream.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+
+       return baseTime
+}
+
+func queryTrace(conn *grpc.ClientConn, traceID string, ts time.Time) error {
+       client := tracev1.NewTraceServiceClient(conn)
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
+
+       req := &tracev1.QueryRequest{
+               Groups: []string{"test-trace-group"},
+               Name:   "sw",
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(ts.Add(-5 * time.Minute)),
+                       End:   timestamppb.New(ts.Add(5 * time.Minute)),
+               },
+               Criteria: &modelv1.Criteria{
+                       Exp: &modelv1.Criteria_Condition{
+                               Condition: &modelv1.Condition{
+                                       Name: "trace_id",
+                                       Op:   modelv1.Condition_BINARY_OP_EQ,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Str{
+                                               Str: &modelv1.Str{Value: 
traceID},
+                                       }},
+                               },
+                       },
+               },
+               TagProjection: []string{"trace_id"},
+       }
+
+       resp, err := client.Query(ctx, req)
+       if err != nil {
+               return err
+       }
+       if len(resp.GetTraces()) == 0 {
+               return fmt.Errorf("trace %s not found", traceID)
+       }
+       return nil
+}
+
+func countPendingParts(nodeAddr string) int {
+       sanitized := sanitizeNodeAddr(nodeAddr)
+       nodeDir := filepath.Join(handoffRoot, sanitized)
+
+       entries, err := os.ReadDir(nodeDir)
+       if err != nil {
+               if errors.Is(err, os.ErrNotExist) {
+                       return 0
+               }
+               Fail(fmt.Sprintf("failed to read handoff directory %s: %v", 
nodeDir, err))
+       }
+
+       count := 0
+       for _, entry := range entries {
+               if entry.IsDir() {
+                       count++
+               }
+       }
+       return count
+}
+
+func sanitizeNodeAddr(addr string) string {
+       replacer := strings.NewReplacer(":", "_", "/", "_", "\\", "_")
+       return replacer.Replace(addr)
+}
+
+func computeShardForTraceID(traceID string, shardCount uint32) uint32 {
+       hasher := fnv.New32a()
+       _, _ = hasher.Write([]byte(traceID))
+       return hasher.Sum32() % shardCount
+}
+
+func generateTraceIDForShard(shardID uint32, shardCount uint32) string {
+       for i := 0; i < 10000; i++ {
+               candidate := fmt.Sprintf("handoff-trace-%d-%d", shardID, i)
+               if computeShardForTraceID(candidate, shardCount) == shardID {
+                       return candidate
+               }
+       }
+       Fail(fmt.Sprintf("failed to generate trace ID for shard %d", shardID))
+       return ""
+}
+
+func shardIDForNode(nodeAddr string, nodeAddrs []string) uint32 {
+       if len(nodeAddrs) == 0 {
+               Fail("no data nodes available to determine shard assignment")
+               return 0
+       }
+       sorted := append([]string(nil), nodeAddrs...)
+       sort.Strings(sorted)
+       for idx, addr := range sorted {
+               if addr == nodeAddr {
+                       return uint32(idx % len(sorted))
+               }
+       }
+       Fail(fmt.Sprintf("node %s not found in data node list %v", nodeAddr, 
nodeAddrs))
+       return 0
+}
+
+func waitForPendingParts(nodeAddr string, timeout time.Duration) bool {
+       deadline := time.Now().Add(timeout)
+       for {
+               if countPendingParts(nodeAddr) > 0 {
+                       return true
+               }
+               if time.Now().After(deadline) {
+                       return false
+               }
+               time.Sleep(100 * time.Millisecond)
+       }
+}

Reply via email to