This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7e7066eeadf3e3d6f9b079e8ded6fd5a889a440f
Author: Gao Hongtao <[email protected]>
AuthorDate: Sun Aug 31 14:17:23 2025 +0800

    Refactor sidx query handling by consolidating merge functions and removing 
redundant block processing logic.
---
 banyand/internal/sidx/multi_sidx_query.go |   2 +-
 banyand/internal/sidx/query_result.go     |  19 +-
 banyand/internal/sidx/sidx.go             | 292 +++++++++++++++++++++++++++---
 banyand/internal/sidx/sidx_test.go        |  61 +++++--
 banyand/trace/part_iter.go                |  44 ++++-
 banyand/trace/query.go                    |  46 +++--
 banyand/trace/trace.go                    |  14 +-
 banyand/trace/write_standalone.go         |  10 +-
 8 files changed, 409 insertions(+), 79 deletions(-)

diff --git a/banyand/internal/sidx/multi_sidx_query.go 
b/banyand/internal/sidx/multi_sidx_query.go
index 3424792b..d8abca97 100644
--- a/banyand/internal/sidx/multi_sidx_query.go
+++ b/banyand/internal/sidx/multi_sidx_query.go
@@ -169,7 +169,7 @@ func mergeMultipleSIDXResponses(responses []*QueryResponse, 
req QueryRequest) *Q
 
        // Use existing merge functions
        if asc {
-               return mergeQueryResponseShardsAsc(responses, 
req.MaxElementSize)
+               return mergeQueryResponseShards(responses, req.MaxElementSize)
        }
        return mergeQueryResponseShardsDesc(responses, req.MaxElementSize)
 }
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index 2ab63fec..c035870f 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -39,21 +39,6 @@ type queryResult struct {
        request    QueryRequest
 }
 
-// loadAndProcessBlock loads a block from part and processes it into 
QueryResponse format.
-func (qr *queryResult) loadAndProcessBlock(tmpBlock *block, bs 
blockScanResult, result *QueryResponse) bool {
-       tmpBlock.reset()
-
-       // Load block data from part (similar to stream's loadBlockCursor)
-       if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) {
-               return false
-       }
-
-       // Convert block data to QueryResponse format
-       qr.convertBlockToResponse(tmpBlock, bs.bm.seriesID, result)
-
-       return true
-}
-
 // loadBlockData loads block data from part using block metadata.
 // Uses blockCursor pattern for optimal performance with selective tag loading.
 func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm 
*blockMetadata) bool {
@@ -309,8 +294,8 @@ func (qr *queryResult) extractElementTags(block *block, 
elemIndex int) []Tag {
        return elementTags
 }
 
-// mergeQueryResponseShardsAsc merges multiple QueryResponse shards in 
ascending order.
-func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
+// mergeQueryResponseShards merges multiple QueryResponse shards.
+func mergeQueryResponseShards(shards []*QueryResponse, maxElements int) 
*QueryResponse {
        // Create heap for ascending merge
        qrh := &QueryResponseHeap{asc: true}
 
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index cb6f9722..9a312b9d 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -18,6 +18,7 @@
 package sidx
 
 import (
+       "container/heap"
        "context"
        "math"
        "sort"
@@ -31,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
@@ -225,7 +227,7 @@ func (s *sidx) Query(ctx context.Context, req QueryRequest) 
(*QueryResponse, err
                }
        }()
 
-       response := s.executeBlockScannerQuery(ctx, bs, req, asc)
+       response := s.executeBlockScannerQuery(ctx, bs, req)
        if response == nil {
                return &QueryResponse{
                        Keys: make([]int64, 0),
@@ -244,7 +246,7 @@ func (s *sidx) Query(ctx context.Context, req QueryRequest) 
(*QueryResponse, err
 }
 
 // executeBlockScannerQuery coordinates the worker pool with block scanner 
following tsResult pattern.
-func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, 
req QueryRequest, asc bool) *QueryResponse {
+func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, 
req QueryRequest) *QueryResponse {
        workerSize := cgroups.CPUs()
        batchCh := make(chan *blockScanResultBatch, workerSize)
 
@@ -290,10 +292,10 @@ func (s *sidx) executeBlockScannerQuery(ctx 
context.Context, bs *blockScanner, r
        workerWg.Wait()
 
        // Merge results from all workers
-       return s.mergeWorkerResults(shards, asc, req.MaxElementSize)
+       return s.mergeWorkerResults(shards, req.MaxElementSize)
 }
 
-// processWorkerBatches processes batches in a worker goroutine.
+// processWorkerBatches processes batches in a worker goroutine using 
heap-based approach.
 func (s *sidx) processWorkerBatches(
        _ int, batchCh chan *blockScanResultBatch, shard *QueryResponse,
        tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory,
@@ -301,6 +303,10 @@ func (s *sidx) processWorkerBatches(
        tmpBlock := generateBlock()
        defer releaseBlock(tmpBlock)
 
+       asc := extractOrdering(req)
+       blockHeap := generateBlockCursorHeap(asc)
+       defer releaseBlockCursorHeap(blockHeap)
+
        for batch := range batchCh {
                if batch.err != nil {
                        shard.Error = batch.err
@@ -308,19 +314,37 @@ func (s *sidx) processWorkerBatches(
                        continue
                }
 
+               // Load all blocks in this batch and create block cursors
                for _, bs := range batch.bss {
-                       if !s.loadAndProcessBlock(tmpBlock, bs, shard, 
tagsToLoad, req, pm) {
-                               // If load fails, continue with next block 
rather than stopping
-                               continue
+                       bc := generateBlockCursor()
+                       bc.init(bs.p, &bs.bm, req)
+
+                       if s.loadBlockCursor(bc, tmpBlock, bs, tagsToLoad, req, 
pm) {
+                               // Set starting index based on sort order
+                               if asc {
+                                       bc.idx = 0
+                               } else {
+                                       bc.idx = len(bc.userKeys) - 1
+                               }
+                               blockHeap.Push(bc)
+                       } else {
+                               releaseBlockCursor(bc)
                        }
                }
 
                releaseBlockScanResultBatch(batch)
        }
+
+       // Initialize heap and merge results
+       if blockHeap.Len() > 0 {
+               heap.Init(blockHeap)
+               result := blockHeap.merge(req.MaxElementSize)
+               shard.CopyFrom(result)
+       }
 }
 
 // mergeWorkerResults merges results from all worker shards with error 
handling.
-func (s *sidx) mergeWorkerResults(shards []*QueryResponse, asc bool, 
maxElementSize int) *QueryResponse {
+func (s *sidx) mergeWorkerResults(shards []*QueryResponse, maxElementSize int) 
*QueryResponse {
        // Check for errors first
        var err error
        for i := range shards {
@@ -333,27 +357,9 @@ func (s *sidx) mergeWorkerResults(shards []*QueryResponse, 
asc bool, maxElementS
                return &QueryResponse{Error: err}
        }
 
-       // Merge results with ordering
-       if asc {
-               return mergeQueryResponseShardsAsc(shards, maxElementSize)
-       }
-       return mergeQueryResponseShardsDesc(shards, maxElementSize)
-}
-
-// loadAndProcessBlock delegates to the queryResult implementation for now.
-func (s *sidx) loadAndProcessBlock(
-       tmpBlock *block, bs blockScanResult, result *QueryResponse,
-       tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory,
-) bool {
-       // Create a temporary queryResult to reuse existing logic
-       qr := &queryResult{
-               request:    req,
-               tagsToLoad: tagsToLoad,
-               pm:         pm,
-               l:          s.l,
-       }
-
-       return qr.loadAndProcessBlock(tmpBlock, bs, result)
+       // Merge results - shards are already in the requested order
+       // Just use ascending merge since shards are pre-sorted
+       return mergeQueryResponseShards(shards, maxElementSize)
 }
 
 // Stats implements SIDX interface.
@@ -526,3 +532,231 @@ func newGC(_ *Options) *gc {
 func (g *gc) clean() {
        // TODO: Implement garbage collection
 }
+
+// blockCursor represents a cursor for iterating through a loaded block, 
similar to query_by_ts.go.
+type blockCursor struct {
+       p        *part
+       bm       *blockMetadata
+       tags     map[string][]Tag
+       userKeys []int64
+       data     [][]byte
+       request  QueryRequest
+       seriesID common.SeriesID
+       idx      int
+}
+
+// init initializes the block cursor.
+func (bc *blockCursor) init(p *part, bm *blockMetadata, req QueryRequest) {
+       bc.p = p
+       bc.bm = bm
+       bc.request = req
+       bc.seriesID = bm.seriesID
+       bc.idx = 0
+}
+
+// loadBlockCursor loads block data into the cursor, similar to 
loadBlockCursor in query_by_ts.go.
+func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock *block, bs 
blockScanResult, tagsToLoad map[string]struct{}, req QueryRequest, pm 
protector.Memory) bool {
+       tmpBlock.reset()
+
+       // Create a temporary queryResult to reuse existing logic
+       qr := &queryResult{
+               request:    req,
+               tagsToLoad: tagsToLoad,
+               pm:         pm,
+               l:          s.l,
+       }
+
+       // Load the block data
+       if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) {
+               return false
+       }
+
+       // Copy data to block cursor
+       bc.userKeys = make([]int64, len(tmpBlock.userKeys))
+       copy(bc.userKeys, tmpBlock.userKeys)
+
+       bc.data = make([][]byte, len(tmpBlock.data))
+       for i, data := range tmpBlock.data {
+               bc.data[i] = make([]byte, len(data))
+               copy(bc.data[i], data)
+       }
+
+       // Copy tags
+       bc.tags = make(map[string][]Tag)
+       for tagName, tagData := range tmpBlock.tags {
+               tagSlice := make([]Tag, len(tagData.values))
+               for i, value := range tagData.values {
+                       tagSlice[i] = Tag{
+                               Name:      tagName,
+                               Value:     value,
+                               ValueType: tagData.valueType,
+                       }
+               }
+               bc.tags[tagName] = tagSlice
+       }
+
+       return len(bc.userKeys) > 0
+}
+
+// copyTo copies the current element to a QueryResponse, similar to 
query_by_ts.go.
+func (bc *blockCursor) copyTo(result *QueryResponse) bool {
+       if bc.idx < 0 || bc.idx >= len(bc.userKeys) {
+               return false
+       }
+
+       // Apply key range filtering
+       key := bc.userKeys[bc.idx]
+       if bc.request.MinKey != nil && key < *bc.request.MinKey {
+               return false
+       }
+       if bc.request.MaxKey != nil && key > *bc.request.MaxKey {
+               return false
+       }
+
+       result.Keys = append(result.Keys, key)
+       result.Data = append(result.Data, bc.data[bc.idx])
+       result.SIDs = append(result.SIDs, bc.seriesID)
+
+       // Copy tags for this element
+       var elementTags []Tag
+       for _, tagSlice := range bc.tags {
+               if bc.idx < len(tagSlice) {
+                       elementTags = append(elementTags, tagSlice[bc.idx])
+               }
+       }
+       result.Tags = append(result.Tags, elementTags)
+       return true
+}
+
+// blockCursorHeap implements heap.Interface for sorting block cursors.
+type blockCursorHeap struct {
+       bcc []*blockCursor
+       asc bool
+}
+
+func (bch blockCursorHeap) Len() int {
+       return len(bch.bcc)
+}
+
+func (bch blockCursorHeap) Less(i, j int) bool {
+       leftIdx, rightIdx := bch.bcc[i].idx, bch.bcc[j].idx
+       if leftIdx >= len(bch.bcc[i].userKeys) || rightIdx >= 
len(bch.bcc[j].userKeys) {
+               return false // Handle bounds check
+       }
+       leftTS := bch.bcc[i].userKeys[leftIdx]
+       rightTS := bch.bcc[j].userKeys[rightIdx]
+       if bch.asc {
+               return leftTS < rightTS
+       }
+       return leftTS > rightTS
+}
+
+func (bch *blockCursorHeap) Swap(i, j int) {
+       bch.bcc[i], bch.bcc[j] = bch.bcc[j], bch.bcc[i]
+}
+
+func (bch *blockCursorHeap) Push(x interface{}) {
+       bch.bcc = append(bch.bcc, x.(*blockCursor))
+}
+
+func (bch *blockCursorHeap) Pop() interface{} {
+       old := bch.bcc
+       n := len(old)
+       x := old[n-1]
+       bch.bcc = old[0 : n-1]
+       releaseBlockCursor(x)
+       return x
+}
+
+func (bch *blockCursorHeap) reset() {
+       for i := range bch.bcc {
+               releaseBlockCursor(bch.bcc[i])
+       }
+       bch.bcc = bch.bcc[:0]
+}
+
+// merge performs heap-based merge similar to query_by_ts.go.
+func (bch *blockCursorHeap) merge(limit int) *QueryResponse {
+       step := -1
+       if bch.asc {
+               step = 1
+       }
+       result := &QueryResponse{
+               Keys: make([]int64, 0),
+               Data: make([][]byte, 0),
+               Tags: make([][]Tag, 0),
+               SIDs: make([]common.SeriesID, 0),
+       }
+
+       for bch.Len() > 0 {
+               topBC := bch.bcc[0]
+               if topBC.idx < 0 || topBC.idx >= len(topBC.userKeys) {
+                       heap.Pop(bch)
+                       continue
+               }
+
+               // Try to copy the element (returns false if filtered out by 
key range)
+               if topBC.copyTo(result) {
+                       if limit > 0 && result.Len() >= limit {
+                               break
+                       }
+               }
+               topBC.idx += step
+
+               if bch.asc {
+                       if topBC.idx >= len(topBC.userKeys) {
+                               heap.Pop(bch)
+                       } else {
+                               heap.Fix(bch, 0)
+                       }
+               } else {
+                       if topBC.idx < 0 {
+                               heap.Pop(bch)
+                       } else {
+                               heap.Fix(bch, 0)
+                       }
+               }
+       }
+
+       return result
+}
+
+var blockCursorHeapPool = 
pool.Register[*blockCursorHeap]("sidx-blockCursorHeap")
+
+func generateBlockCursorHeap(asc bool) *blockCursorHeap {
+       v := blockCursorHeapPool.Get()
+       if v == nil {
+               return &blockCursorHeap{
+                       asc: asc,
+                       bcc: make([]*blockCursor, 0, blockScannerBatchSize),
+               }
+       }
+       v.asc = asc
+       return v
+}
+
+func releaseBlockCursorHeap(bch *blockCursorHeap) {
+       bch.reset()
+       blockCursorHeapPool.Put(bch)
+}
+
+var blockCursorPool = pool.Register[*blockCursor]("sidx-blockCursor")
+
+func generateBlockCursor() *blockCursor {
+       v := blockCursorPool.Get()
+       if v == nil {
+               return &blockCursor{}
+       }
+       return v
+}
+
+func releaseBlockCursor(bc *blockCursor) {
+       bc.p = nil
+       bc.bm = nil
+       bc.userKeys = bc.userKeys[:0]
+       bc.data = bc.data[:0]
+       bc.tags = nil
+       bc.seriesID = 0
+       bc.idx = 0
+       blockCursorPool.Put(bc)
+}
diff --git a/banyand/internal/sidx/sidx_test.go 
b/banyand/internal/sidx/sidx_test.go
index 3b95334a..a668121d 100644
--- a/banyand/internal/sidx/sidx_test.go
+++ b/banyand/internal/sidx/sidx_test.go
@@ -302,11 +302,22 @@ func TestSIDX_Query_Ordering(t *testing.T) {
 
        ctx := context.Background()
 
-       // Write data in non-sorted order
+       // Write data from different seriesIDs in non-sorted order to expose 
ordering bugs
        reqs := []WriteRequest{
-               createTestWriteRequest(1, 300, "data300"),
-               createTestWriteRequest(1, 100, "data100"),
-               createTestWriteRequest(1, 200, "data200"),
+               // Series 1 data
+               createTestWriteRequest(1, 300, "series1-data300"),
+               createTestWriteRequest(1, 100, "series1-data100"),
+               createTestWriteRequest(1, 200, "series1-data200"),
+
+               // Series 2 data
+               createTestWriteRequest(2, 250, "series2-data250"),
+               createTestWriteRequest(2, 150, "series2-data150"),
+               createTestWriteRequest(2, 50, "series2-data50"),
+
+               // Series 3 data
+               createTestWriteRequest(3, 350, "series3-data350"),
+               createTestWriteRequest(3, 75, "series3-data75"),
+               createTestWriteRequest(3, 175, "series3-data175"),
        }
        err := sidx.Write(ctx, reqs)
        require.NoError(t, err)
@@ -317,29 +328,45 @@ func TestSIDX_Query_Ordering(t *testing.T) {
        tests := []struct {
                order     *index.OrderBy
                name      string
+               seriesIDs []common.SeriesID
                ascending bool
        }{
                {
-                       name:      "ascending order",
+                       name:      "ascending order single series",
+                       ascending: true,
+                       order:     &index.OrderBy{Sort: modelv1.Sort_SORT_ASC},
+                       seriesIDs: []common.SeriesID{1},
+               },
+               {
+                       name:      "descending order single series",
+                       ascending: false,
+                       order:     &index.OrderBy{Sort: modelv1.Sort_SORT_DESC},
+                       seriesIDs: []common.SeriesID{1},
+               },
+               {
+                       name:      "ascending order multiple series",
                        ascending: true,
                        order:     &index.OrderBy{Sort: modelv1.Sort_SORT_ASC},
+                       seriesIDs: []common.SeriesID{1, 2, 3},
                },
                {
-                       name:      "descending order",
+                       name:      "descending order multiple series",
                        ascending: false,
                        order:     &index.OrderBy{Sort: modelv1.Sort_SORT_DESC},
+                       seriesIDs: []common.SeriesID{1, 2, 3},
                },
                {
-                       name:      "default order",
+                       name:      "default order multiple series",
                        ascending: true,
                        order:     nil, // Should default to ascending
+                       seriesIDs: []common.SeriesID{1, 2, 3},
                },
        }
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        queryReq := QueryRequest{
-                               SeriesIDs: []common.SeriesID{1},
+                               SeriesIDs: tt.seriesIDs,
                                Order:     tt.order,
                        }
 
@@ -350,6 +377,14 @@ func TestSIDX_Query_Ordering(t *testing.T) {
                        // Get all keys directly from response
                        allKeys := response.Keys
 
+                       // Debug: Print the keys and data to understand the 
ordering
+                       t.Logf("Query with series %v, order %v", tt.seriesIDs, 
tt.order)
+                       for i, key := range allKeys {
+                               if i < len(response.Data) && i < 
len(response.SIDs) {
+                                       t.Logf("  Key: %d, Data: %s, SeriesID: 
%d", key, string(response.Data[i]), response.SIDs[i])
+                               }
+                       }
+
                        // Verify ordering
                        if len(allKeys) > 1 {
                                isSorted := sort.SliceIsSorted(allKeys, func(i, 
j int) bool {
@@ -358,8 +393,8 @@ func TestSIDX_Query_Ordering(t *testing.T) {
                                        }
                                        return allKeys[i] > allKeys[j]
                                })
-                               assert.True(t, isSorted, "Keys should be sorted 
in %s order",
-                                       map[bool]string{true: "ascending", 
false: "descending"}[tt.ascending])
+                               assert.True(t, isSorted, "Keys should be sorted 
in %s order. Keys: %v",
+                                       map[bool]string{true: "ascending", 
false: "descending"}[tt.ascending], allKeys)
                        }
                })
        }
@@ -840,7 +875,7 @@ func 
TestQueryResult_ConvertBlockToResponse_IncrementalLimit(t *testing.T) {
                data:     [][]byte{[]byte("trace2"), []byte("trace2")},
                tags:     make(map[string]*tagData),
        }
-       qr.convertBlockToResponse(block2, 1, result)
+       qr.convertBlockToResponse(block2, 2, result)
 
        // Verify second call results
        assert.Equal(t, 4, result.Len(), "Second call should add 2 more 
elements")
@@ -856,7 +891,7 @@ func 
TestQueryResult_ConvertBlockToResponse_IncrementalLimit(t *testing.T) {
                data:     [][]byte{[]byte("trace3"), []byte("trace3")},
                tags:     make(map[string]*tagData),
        }
-       qr.convertBlockToResponse(block3, 1, result)
+       qr.convertBlockToResponse(block3, 3, result)
 
        // Verify third call results - should not add anything due to limit
        assert.Equal(t, 4, result.Len(), "Third call should not add elements 
due to limit")
@@ -968,7 +1003,7 @@ func TestQueryResponseHeap_MergeWithHeap_UniqueDataLimit(t 
*testing.T) {
                        // Initialize heap and merge
                        // Note: We can't call heap.Init directly since 
QueryResponseHeap is not exported
                        // Instead, we'll test via the public 
mergeQueryResponseShardsAsc function
-                       result := mergeQueryResponseShardsAsc(tt.shards, 
tt.limit)
+                       result := mergeQueryResponseShards(tt.shards, tt.limit)
 
                        // Verify total length
                        assert.Equal(t, tt.expectedLen, result.Len(),
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index 41cfce94..f6f069a1 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -126,6 +126,40 @@ func (pi *partIter) error() error {
        return pi.err
 }
 
+func (pi *partIter) nextTID() bool {
+       if pi.tidIdx >= len(pi.tids) {
+               pi.err = io.EOF
+               return false
+       }
+       pi.curBlock.traceID = pi.tids[pi.tidIdx]
+       pi.tidIdx++
+       return true
+}
+
+func (pi *partIter) searchTargetTID(tid string) bool {
+       if pi.curBlock.traceID >= tid {
+               return true
+       }
+       if !pi.nextTID() {
+               return false
+       }
+       if pi.curBlock.traceID >= tid {
+               return true
+       }
+       tids := pi.tids[pi.tidIdx:]
+       pi.tidIdx += sort.Search(len(tids), func(i int) bool {
+               return tid <= tids[i]
+       })
+       if pi.tidIdx >= len(pi.tids) {
+               pi.tidIdx = len(pi.tids)
+               pi.err = io.EOF
+               return false
+       }
+       pi.curBlock.traceID = pi.tids[pi.tidIdx]
+       pi.tidIdx++
+       return true
+}
+
 func (pi *partIter) loadNextBlockMetadata() bool {
        if len(pi.primaryBlockMetadata) > 0 {
                if !pi.searchTargetTraceID(pi.primaryBlockMetadata[0].traceID) {
@@ -189,7 +223,7 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, 
mr *primaryBlockMetada
 
 func (pi *partIter) findBlock() bool {
        bhs := pi.bms
-       if len(bhs) > 0 {
+       for len(bhs) > 0 {
                tid := pi.curBlock.traceID
                if bhs[0].traceID < tid {
                        n := sort.Search(len(bhs), func(i int) bool {
@@ -203,9 +237,11 @@ func (pi *partIter) findBlock() bool {
                }
                bm := &bhs[0]
 
-               if bm.traceID > tid {
-                       pi.bms = bhs[:0]
-                       return false
+               if bm.traceID != tid {
+                       if !pi.searchTargetTID(bm.traceID) {
+                               return false
+                       }
+                       continue
                }
 
                pi.curBlock = bm
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index a0c328fc..14434932 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -169,6 +169,12 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
                                result.originalSidxOrder = make([]string, 
len(traceIDs))
                                copy(result.originalSidxOrder, traceIDs)
 
+                               // Create sidx order map for efficient heap 
operations
+                               result.sidxOrderMap = make(map[string]int)
+                               for i, traceID := range 
result.originalSidxOrder {
+                                       result.sidxOrderMap[traceID] = i
+                               }
+
                                qo.traceIDs = traceIDs
                                sort.Strings(qo.traceIDs) // Sort for partIter 
efficiency
                        }
@@ -239,10 +245,11 @@ func (t *trace) searchBlocks(ctx context.Context, result 
*queryResult, parts []*
 type queryResult struct {
        ctx               context.Context
        tagProjection     *model.TagProjection
+       sidxOrderMap      map[string]int
        data              []*blockCursor
        snapshots         []*snapshot
        segments          []storage.Segment[*tsTable, option]
-       originalSidxOrder []string // Store original sidx order for final 
sorting
+       originalSidxOrder []string
        hit               int
        loaded            bool
 }
@@ -328,19 +335,13 @@ func (qr *queryResult) resortDataBySidxOrder() {
                return
        }
 
-       // Create a map from trace ID to its position in the original sidx order
-       traceIDToOrder := make(map[string]int)
-       for i, traceID := range qr.originalSidxOrder {
-               traceIDToOrder[traceID] = i
-       }
-
-       // Sort data according to the original sidx order
+       // Sort data according to the original sidx order using the 
pre-computed map
        sort.Slice(qr.data, func(i, j int) bool {
                traceIDi := qr.data[i].bm.traceID
                traceIDj := qr.data[j].bm.traceID
 
-               orderi, existi := traceIDToOrder[traceIDi]
-               orderj, existj := traceIDToOrder[traceIDj]
+               orderi, existi := qr.sidxOrderMap[traceIDi]
+               orderj, existj := qr.sidxOrderMap[traceIDj]
 
                // If both trace IDs are in the original order, use that 
ordering
                if existi && existj {
@@ -378,7 +379,30 @@ func (qr queryResult) Len() int {
 }
 
 func (qr queryResult) Less(i, j int) bool {
-       return qr.data[i].bm.traceID < qr.data[j].bm.traceID
+       // If no original sidx order, return natural order (no sorted merge)
+       if len(qr.originalSidxOrder) == 0 {
+               return false
+       }
+
+       traceIDi := qr.data[i].bm.traceID
+       traceIDj := qr.data[j].bm.traceID
+
+       orderi, existi := qr.sidxOrderMap[traceIDi]
+       orderj, existj := qr.sidxOrderMap[traceIDj]
+
+       // If both trace IDs are in the original order, use that ordering
+       if existi && existj {
+               return orderi < orderj
+       }
+       // If only one is in the original order, prioritize it
+       if existi {
+               return true
+       }
+       if existj {
+               return false
+       }
+       // If neither is in the original order, use alphabetical ordering as 
fallback
+       return traceIDi < traceIDj
 }
 
 func (qr queryResult) Swap(i, j int) {
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index fa9da38c..952c313d 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -175,11 +175,19 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, 
sidxInstances []sidx.S
                return nil, nil
        }
 
-       // Extract trace IDs from response data
-       traceIDs := make([]string, 0, len(response.Data))
+       // Extract trace IDs from response data and deduplicate them while 
preserving order
+       // Since each trace may be indexed by multiple series, we get duplicates
+       // We need to keep only the first occurrence of each trace ID to 
preserve ordering
+       seenTraceIDs := make(map[string]bool)
+       var traceIDs []string
+
        for _, data := range response.Data {
                if len(data) > 0 {
-                       traceIDs = append(traceIDs, string(data))
+                       traceID := string(data)
+                       if !seenTraceIDs[traceID] {
+                               seenTraceIDs[traceID] = true
+                               traceIDs = append(traceIDs, traceID)
+                       }
                }
        }
 
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index a498bc9c..5baf6ba2 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -244,7 +244,15 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable 
*tracesInTable, writeEv
                if tv.valueType != pbv1.ValueTypeInt64 && tv.valueType != 
pbv1.ValueTypeTimestamp {
                        return fmt.Errorf("unsupported tag value type: %s", 
tv.tag)
                }
-               key := req.Tags[tagIdx].GetInt().GetValue()
+
+               var key int64
+               if tv.valueType == pbv1.ValueTypeTimestamp {
+                       // For timestamp tags, get the unix nano timestamp as 
the key
+                       key = 
req.Tags[tagIdx].GetTimestamp().AsTime().UnixNano()
+               } else {
+                       // For int64 tags, get the int value as the key
+                       key = req.Tags[tagIdx].GetInt().GetValue()
+               }
 
                entityValues := make([]*modelv1.TagValue, 0, 
len(indexRule.Tags))
                for _, tagName := range indexRule.Tags {

Reply via email to