This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7e7066eeadf3e3d6f9b079e8ded6fd5a889a440f Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 31 14:17:23 2025 +0800 Refactor sidx query handling by consolidating merge functions and removing redundant block processing logic. --- banyand/internal/sidx/multi_sidx_query.go | 2 +- banyand/internal/sidx/query_result.go | 19 +- banyand/internal/sidx/sidx.go | 292 +++++++++++++++++++++++++++--- banyand/internal/sidx/sidx_test.go | 61 +++++-- banyand/trace/part_iter.go | 44 ++++- banyand/trace/query.go | 46 +++-- banyand/trace/trace.go | 14 +- banyand/trace/write_standalone.go | 10 +- 8 files changed, 409 insertions(+), 79 deletions(-) diff --git a/banyand/internal/sidx/multi_sidx_query.go b/banyand/internal/sidx/multi_sidx_query.go index 3424792b..d8abca97 100644 --- a/banyand/internal/sidx/multi_sidx_query.go +++ b/banyand/internal/sidx/multi_sidx_query.go @@ -169,7 +169,7 @@ func mergeMultipleSIDXResponses(responses []*QueryResponse, req QueryRequest) *Q // Use existing merge functions if asc { - return mergeQueryResponseShardsAsc(responses, req.MaxElementSize) + return mergeQueryResponseShards(responses, req.MaxElementSize) } return mergeQueryResponseShardsDesc(responses, req.MaxElementSize) } diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 2ab63fec..c035870f 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -39,21 +39,6 @@ type queryResult struct { request QueryRequest } -// loadAndProcessBlock loads a block from part and processes it into QueryResponse format. -func (qr *queryResult) loadAndProcessBlock(tmpBlock *block, bs blockScanResult, result *QueryResponse) bool { - tmpBlock.reset() - - // Load block data from part (similar to stream's loadBlockCursor) - if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) { - return false - } - - // Convert block data to QueryResponse format - qr.convertBlockToResponse(tmpBlock, bs.bm.seriesID, result) - - return true -} - // loadBlockData loads block data from part using block metadata. // Uses blockCursor pattern for optimal performance with selective tag loading. func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata) bool { @@ -309,8 +294,8 @@ func (qr *queryResult) extractElementTags(block *block, elemIndex int) []Tag { return elementTags } -// mergeQueryResponseShardsAsc merges multiple QueryResponse shards in ascending order. -func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) *QueryResponse { +// mergeQueryResponseShards merges multiple QueryResponse shards. +func mergeQueryResponseShards(shards []*QueryResponse, maxElements int) *QueryResponse { // Create heap for ascending merge qrh := &QueryResponseHeap{asc: true} diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index cb6f9722..9a312b9d 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -18,6 +18,7 @@ package sidx import ( + "container/heap" "context" "math" "sort" @@ -31,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -225,7 +227,7 @@ func (s *sidx) Query(ctx context.Context, req QueryRequest) (*QueryResponse, err } }() - response := s.executeBlockScannerQuery(ctx, bs, req, asc) + response := s.executeBlockScannerQuery(ctx, bs, req) if response == nil { return &QueryResponse{ Keys: make([]int64, 0), @@ -244,7 +246,7 @@ func (s *sidx) Query(ctx context.Context, req QueryRequest) (*QueryResponse, err } // executeBlockScannerQuery coordinates the worker pool with block scanner following tsResult pattern. -func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, req QueryRequest, asc bool) *QueryResponse { +func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, req QueryRequest) *QueryResponse { workerSize := cgroups.CPUs() batchCh := make(chan *blockScanResultBatch, workerSize) @@ -290,10 +292,10 @@ func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, r workerWg.Wait() // Merge results from all workers - return s.mergeWorkerResults(shards, asc, req.MaxElementSize) + return s.mergeWorkerResults(shards, req.MaxElementSize) } -// processWorkerBatches processes batches in a worker goroutine. +// processWorkerBatches processes batches in a worker goroutine using heap-based approach. func (s *sidx) processWorkerBatches( _ int, batchCh chan *blockScanResultBatch, shard *QueryResponse, tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory, @@ -301,6 +303,10 @@ func (s *sidx) processWorkerBatches( tmpBlock := generateBlock() defer releaseBlock(tmpBlock) + asc := extractOrdering(req) + blockHeap := generateBlockCursorHeap(asc) + defer releaseBlockCursorHeap(blockHeap) + for batch := range batchCh { if batch.err != nil { shard.Error = batch.err @@ -308,19 +314,37 @@ func (s *sidx) processWorkerBatches( continue } + // Load all blocks in this batch and create block cursors for _, bs := range batch.bss { - if !s.loadAndProcessBlock(tmpBlock, bs, shard, tagsToLoad, req, pm) { - // If load fails, continue with next block rather than stopping - continue + bc := generateBlockCursor() + bc.init(bs.p, &bs.bm, req) + + if s.loadBlockCursor(bc, tmpBlock, bs, tagsToLoad, req, pm) { + // Set starting index based on sort order + if asc { + bc.idx = 0 + } else { + bc.idx = len(bc.userKeys) - 1 + } + blockHeap.Push(bc) + } else { + releaseBlockCursor(bc) } } releaseBlockScanResultBatch(batch) } + + // Initialize heap and merge results + if blockHeap.Len() > 0 { + heap.Init(blockHeap) + result := blockHeap.merge(req.MaxElementSize) + shard.CopyFrom(result) + } } // mergeWorkerResults merges results from all worker shards with error handling. -func (s *sidx) mergeWorkerResults(shards []*QueryResponse, asc bool, maxElementSize int) *QueryResponse { +func (s *sidx) mergeWorkerResults(shards []*QueryResponse, maxElementSize int) *QueryResponse { // Check for errors first var err error for i := range shards { @@ -333,27 +357,9 @@ func (s *sidx) mergeWorkerResults(shards []*QueryResponse, asc bool, maxElementS return &QueryResponse{Error: err} } - // Merge results with ordering - if asc { - return mergeQueryResponseShardsAsc(shards, maxElementSize) - } - return mergeQueryResponseShardsDesc(shards, maxElementSize) -} - -// loadAndProcessBlock delegates to the queryResult implementation for now. -func (s *sidx) loadAndProcessBlock( - tmpBlock *block, bs blockScanResult, result *QueryResponse, - tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory, -) bool { - // Create a temporary queryResult to reuse existing logic - qr := &queryResult{ - request: req, - tagsToLoad: tagsToLoad, - pm: pm, - l: s.l, - } - - return qr.loadAndProcessBlock(tmpBlock, bs, result) + // Merge results - shards are already in the requested order + // Just use ascending merge since shards are pre-sorted + return mergeQueryResponseShards(shards, maxElementSize) } // Stats implements SIDX interface. @@ -526,3 +532,231 @@ func newGC(_ *Options) *gc { func (g *gc) clean() { // TODO: Implement garbage collection } + +// blockCursor represents a cursor for iterating through a loaded block, similar to query_by_ts.go. +type blockCursor struct { + p *part + bm *blockMetadata + tags map[string][]Tag + userKeys []int64 + data [][]byte + request QueryRequest + seriesID common.SeriesID + idx int +} + +// init initializes the block cursor. +func (bc *blockCursor) init(p *part, bm *blockMetadata, req QueryRequest) { + bc.p = p + bc.bm = bm + bc.request = req + bc.seriesID = bm.seriesID + bc.idx = 0 +} + +// loadBlockCursor loads block data into the cursor, similar to loadBlockCursor in query_by_ts.go. +func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock *block, bs blockScanResult, tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory) bool { + tmpBlock.reset() + + // Create a temporary queryResult to reuse existing logic + qr := &queryResult{ + request: req, + tagsToLoad: tagsToLoad, + pm: pm, + l: s.l, + } + + // Load the block data + if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) { + return false + } + + // Copy data to block cursor + bc.userKeys = make([]int64, len(tmpBlock.userKeys)) + copy(bc.userKeys, tmpBlock.userKeys) + + bc.data = make([][]byte, len(tmpBlock.data)) + for i, data := range tmpBlock.data { + bc.data[i] = make([]byte, len(data)) + copy(bc.data[i], data) + } + + // Copy tags + bc.tags = make(map[string][]Tag) + for tagName, tagData := range tmpBlock.tags { + tagSlice := make([]Tag, len(tagData.values)) + for i, value := range tagData.values { + tagSlice[i] = Tag{ + Name: tagName, + Value: value, + ValueType: tagData.valueType, + } + } + bc.tags[tagName] = tagSlice + } + + return len(bc.userKeys) > 0 +} + +// copyTo copies the current element to a QueryResponse, similar to query_by_ts.go. +func (bc *blockCursor) copyTo(result *QueryResponse) bool { + if bc.idx < 0 || bc.idx >= len(bc.userKeys) { + return false + } + + // Apply key range filtering + key := bc.userKeys[bc.idx] + if bc.request.MinKey != nil && key < *bc.request.MinKey { + return false + } + if bc.request.MaxKey != nil && key > *bc.request.MaxKey { + return false + } + + result.Keys = append(result.Keys, key) + result.Data = append(result.Data, bc.data[bc.idx]) + result.SIDs = append(result.SIDs, bc.seriesID) + + // Copy tags for this element + var elementTags []Tag + for _, tagSlice := range bc.tags { + if bc.idx < len(tagSlice) { + elementTags = append(elementTags, tagSlice[bc.idx]) + } + } + result.Tags = append(result.Tags, elementTags) + return true +} + +// blockCursorHeap implements heap.Interface for sorting block cursors. +type blockCursorHeap struct { + bcc []*blockCursor + asc bool +} + +func (bch blockCursorHeap) Len() int { + return len(bch.bcc) +} + +func (bch blockCursorHeap) Less(i, j int) bool { + leftIdx, rightIdx := bch.bcc[i].idx, bch.bcc[j].idx + if leftIdx >= len(bch.bcc[i].userKeys) || rightIdx >= len(bch.bcc[j].userKeys) { + return false // Handle bounds check + } + leftTS := bch.bcc[i].userKeys[leftIdx] + rightTS := bch.bcc[j].userKeys[rightIdx] + if bch.asc { + return leftTS < rightTS + } + return leftTS > rightTS +} + +func (bch *blockCursorHeap) Swap(i, j int) { + bch.bcc[i], bch.bcc[j] = bch.bcc[j], bch.bcc[i] +} + +func (bch *blockCursorHeap) Push(x interface{}) { + bch.bcc = append(bch.bcc, x.(*blockCursor)) +} + +func (bch *blockCursorHeap) Pop() interface{} { + old := bch.bcc + n := len(old) + x := old[n-1] + bch.bcc = old[0 : n-1] + releaseBlockCursor(x) + return x +} + +func (bch *blockCursorHeap) reset() { + for i := range bch.bcc { + releaseBlockCursor(bch.bcc[i]) + } + bch.bcc = bch.bcc[:0] +} + +// merge performs heap-based merge similar to query_by_ts.go. +func (bch *blockCursorHeap) merge(limit int) *QueryResponse { + step := -1 + if bch.asc { + step = 1 + } + result := &QueryResponse{ + Keys: make([]int64, 0), + Data: make([][]byte, 0), + Tags: make([][]Tag, 0), + SIDs: make([]common.SeriesID, 0), + } + + for bch.Len() > 0 { + topBC := bch.bcc[0] + if topBC.idx < 0 || topBC.idx >= len(topBC.userKeys) { + heap.Pop(bch) + continue + } + + // Try to copy the element (returns false if filtered out by key range) + if topBC.copyTo(result) { + if limit > 0 && result.Len() >= limit { + break + } + } + topBC.idx += step + + if bch.asc { + if topBC.idx >= len(topBC.userKeys) { + heap.Pop(bch) + } else { + heap.Fix(bch, 0) + } + } else { + if topBC.idx < 0 { + heap.Pop(bch) + } else { + heap.Fix(bch, 0) + } + } + } + + return result +} + +var blockCursorHeapPool = pool.Register[*blockCursorHeap]("sidx-blockCursorHeap") + +func generateBlockCursorHeap(asc bool) *blockCursorHeap { + v := blockCursorHeapPool.Get() + if v == nil { + return &blockCursorHeap{ + asc: asc, + bcc: make([]*blockCursor, 0, blockScannerBatchSize), + } + } + v.asc = asc + return v +} + +func releaseBlockCursorHeap(bch *blockCursorHeap) { + bch.reset() + blockCursorHeapPool.Put(bch) +} + +var blockCursorPool = pool.Register[*blockCursor]("sidx-blockCursor") + +func generateBlockCursor() *blockCursor { + v := blockCursorPool.Get() + if v == nil { + return &blockCursor{} + } + return v +} + +func releaseBlockCursor(bc *blockCursor) { + bc.p = nil + bc.bm = nil + bc.userKeys = bc.userKeys[:0] + bc.data = bc.data[:0] + bc.tags = nil + bc.seriesID = 0 + bc.idx = 0 + blockCursorPool.Put(bc) +} diff --git a/banyand/internal/sidx/sidx_test.go b/banyand/internal/sidx/sidx_test.go index 3b95334a..a668121d 100644 --- a/banyand/internal/sidx/sidx_test.go +++ b/banyand/internal/sidx/sidx_test.go @@ -302,11 +302,22 @@ func TestSIDX_Query_Ordering(t *testing.T) { ctx := context.Background() - // Write data in non-sorted order + // Write data from different seriesIDs in non-sorted order to expose ordering bugs reqs := []WriteRequest{ - createTestWriteRequest(1, 300, "data300"), - createTestWriteRequest(1, 100, "data100"), - createTestWriteRequest(1, 200, "data200"), + // Series 1 data + createTestWriteRequest(1, 300, "series1-data300"), + createTestWriteRequest(1, 100, "series1-data100"), + createTestWriteRequest(1, 200, "series1-data200"), + + // Series 2 data + createTestWriteRequest(2, 250, "series2-data250"), + createTestWriteRequest(2, 150, "series2-data150"), + createTestWriteRequest(2, 50, "series2-data50"), + + // Series 3 data + createTestWriteRequest(3, 350, "series3-data350"), + createTestWriteRequest(3, 75, "series3-data75"), + createTestWriteRequest(3, 175, "series3-data175"), } err := sidx.Write(ctx, reqs) require.NoError(t, err) @@ -317,29 +328,45 @@ func TestSIDX_Query_Ordering(t *testing.T) { tests := []struct { order *index.OrderBy name string + seriesIDs []common.SeriesID ascending bool }{ { - name: "ascending order", + name: "ascending order single series", + ascending: true, + order: &index.OrderBy{Sort: modelv1.Sort_SORT_ASC}, + seriesIDs: []common.SeriesID{1}, + }, + { + name: "descending order single series", + ascending: false, + order: &index.OrderBy{Sort: modelv1.Sort_SORT_DESC}, + seriesIDs: []common.SeriesID{1}, + }, + { + name: "ascending order multiple series", ascending: true, order: &index.OrderBy{Sort: modelv1.Sort_SORT_ASC}, + seriesIDs: []common.SeriesID{1, 2, 3}, }, { - name: "descending order", + name: "descending order multiple series", ascending: false, order: &index.OrderBy{Sort: modelv1.Sort_SORT_DESC}, + seriesIDs: []common.SeriesID{1, 2, 3}, }, { - name: "default order", + name: "default order multiple series", ascending: true, order: nil, // Should default to ascending + seriesIDs: []common.SeriesID{1, 2, 3}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { queryReq := QueryRequest{ - SeriesIDs: []common.SeriesID{1}, + SeriesIDs: tt.seriesIDs, Order: tt.order, } @@ -350,6 +377,14 @@ func TestSIDX_Query_Ordering(t *testing.T) { // Get all keys directly from response allKeys := response.Keys + // Debug: Print the keys and data to understand the ordering + t.Logf("Query with series %v, order %v", tt.seriesIDs, tt.order) + for i, key := range allKeys { + if i < len(response.Data) && i < len(response.SIDs) { + t.Logf(" Key: %d, Data: %s, SeriesID: %d", key, string(response.Data[i]), response.SIDs[i]) + } + } + // Verify ordering if len(allKeys) > 1 { isSorted := sort.SliceIsSorted(allKeys, func(i, j int) bool { @@ -358,8 +393,8 @@ func TestSIDX_Query_Ordering(t *testing.T) { } return allKeys[i] > allKeys[j] }) - assert.True(t, isSorted, "Keys should be sorted in %s order", - map[bool]string{true: "ascending", false: "descending"}[tt.ascending]) + assert.True(t, isSorted, "Keys should be sorted in %s order. Keys: %v", + map[bool]string{true: "ascending", false: "descending"}[tt.ascending], allKeys) } }) } @@ -840,7 +875,7 @@ func TestQueryResult_ConvertBlockToResponse_IncrementalLimit(t *testing.T) { data: [][]byte{[]byte("trace2"), []byte("trace2")}, tags: make(map[string]*tagData), } - qr.convertBlockToResponse(block2, 1, result) + qr.convertBlockToResponse(block2, 2, result) // Verify second call results assert.Equal(t, 4, result.Len(), "Second call should add 2 more elements") @@ -856,7 +891,7 @@ func TestQueryResult_ConvertBlockToResponse_IncrementalLimit(t *testing.T) { data: [][]byte{[]byte("trace3"), []byte("trace3")}, tags: make(map[string]*tagData), } - qr.convertBlockToResponse(block3, 1, result) + qr.convertBlockToResponse(block3, 3, result) // Verify third call results - should not add anything due to limit assert.Equal(t, 4, result.Len(), "Third call should not add elements due to limit") @@ -968,7 +1003,7 @@ func TestQueryResponseHeap_MergeWithHeap_UniqueDataLimit(t *testing.T) { // Initialize heap and merge // Note: We can't call heap.Init directly since QueryResponseHeap is not exported // Instead, we'll test via the public mergeQueryResponseShardsAsc function - result := mergeQueryResponseShardsAsc(tt.shards, tt.limit) + result := mergeQueryResponseShards(tt.shards, tt.limit) // Verify total length assert.Equal(t, tt.expectedLen, result.Len(), diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go index 41cfce94..f6f069a1 100644 --- a/banyand/trace/part_iter.go +++ b/banyand/trace/part_iter.go @@ -126,6 +126,40 @@ func (pi *partIter) error() error { return pi.err } +func (pi *partIter) nextTID() bool { + if pi.tidIdx >= len(pi.tids) { + pi.err = io.EOF + return false + } + pi.curBlock.traceID = pi.tids[pi.tidIdx] + pi.tidIdx++ + return true +} + +func (pi *partIter) searchTargetTID(tid string) bool { + if pi.curBlock.traceID >= tid { + return true + } + if !pi.nextTID() { + return false + } + if pi.curBlock.traceID >= tid { + return true + } + tids := pi.tids[pi.tidIdx:] + pi.tidIdx += sort.Search(len(tids), func(i int) bool { + return tid <= tids[i] + }) + if pi.tidIdx >= len(pi.tids) { + pi.tidIdx = len(pi.tids) + pi.err = io.EOF + return false + } + pi.curBlock.traceID = pi.tids[pi.tidIdx] + pi.tidIdx++ + return true +} + func (pi *partIter) loadNextBlockMetadata() bool { if len(pi.primaryBlockMetadata) > 0 { if !pi.searchTargetTraceID(pi.primaryBlockMetadata[0].traceID) { @@ -189,7 +223,7 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetada func (pi *partIter) findBlock() bool { bhs := pi.bms - if len(bhs) > 0 { + for len(bhs) > 0 { tid := pi.curBlock.traceID if bhs[0].traceID < tid { n := sort.Search(len(bhs), func(i int) bool { @@ -203,9 +237,11 @@ func (pi *partIter) findBlock() bool { } bm := &bhs[0] - if bm.traceID > tid { - pi.bms = bhs[:0] - return false + if bm.traceID != tid { + if !pi.searchTargetTID(bm.traceID) { + return false + } + continue } pi.curBlock = bm diff --git a/banyand/trace/query.go b/banyand/trace/query.go index a0c328fc..14434932 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -169,6 +169,12 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T result.originalSidxOrder = make([]string, len(traceIDs)) copy(result.originalSidxOrder, traceIDs) + // Create sidx order map for efficient heap operations + result.sidxOrderMap = make(map[string]int) + for i, traceID := range result.originalSidxOrder { + result.sidxOrderMap[traceID] = i + } + qo.traceIDs = traceIDs sort.Strings(qo.traceIDs) // Sort for partIter efficiency } @@ -239,10 +245,11 @@ func (t *trace) searchBlocks(ctx context.Context, result *queryResult, parts []* type queryResult struct { ctx context.Context tagProjection *model.TagProjection + sidxOrderMap map[string]int data []*blockCursor snapshots []*snapshot segments []storage.Segment[*tsTable, option] - originalSidxOrder []string // Store original sidx order for final sorting + originalSidxOrder []string hit int loaded bool } @@ -328,19 +335,13 @@ func (qr *queryResult) resortDataBySidxOrder() { return } - // Create a map from trace ID to its position in the original sidx order - traceIDToOrder := make(map[string]int) - for i, traceID := range qr.originalSidxOrder { - traceIDToOrder[traceID] = i - } - - // Sort data according to the original sidx order + // Sort data according to the original sidx order using the pre-computed map sort.Slice(qr.data, func(i, j int) bool { traceIDi := qr.data[i].bm.traceID traceIDj := qr.data[j].bm.traceID - orderi, existi := traceIDToOrder[traceIDi] - orderj, existj := traceIDToOrder[traceIDj] + orderi, existi := qr.sidxOrderMap[traceIDi] + orderj, existj := qr.sidxOrderMap[traceIDj] // If both trace IDs are in the original order, use that ordering if existi && existj { @@ -378,7 +379,30 @@ func (qr queryResult) Len() int { } func (qr queryResult) Less(i, j int) bool { - return qr.data[i].bm.traceID < qr.data[j].bm.traceID + // If no original sidx order, return natural order (no sorted merge) + if len(qr.originalSidxOrder) == 0 { + return false + } + + traceIDi := qr.data[i].bm.traceID + traceIDj := qr.data[j].bm.traceID + + orderi, existi := qr.sidxOrderMap[traceIDi] + orderj, existj := qr.sidxOrderMap[traceIDj] + + // If both trace IDs are in the original order, use that ordering + if existi && existj { + return orderi < orderj + } + // If only one is in the original order, prioritize it + if existi { + return true + } + if existj { + return false + } + // If neither is in the original order, use alphabetical ordering as fallback + return traceIDi < traceIDj } func (qr queryResult) Swap(i, j int) { diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index fa9da38c..952c313d 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -175,11 +175,19 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.S return nil, nil } - // Extract trace IDs from response data - traceIDs := make([]string, 0, len(response.Data)) + // Extract trace IDs from response data and deduplicate them while preserving order + // Since each trace may be indexed by multiple series, we get duplicates + // We need to keep only the first occurrence of each trace ID to preserve ordering + seenTraceIDs := make(map[string]bool) + var traceIDs []string + for _, data := range response.Data { if len(data) > 0 { - traceIDs = append(traceIDs, string(data)) + traceID := string(data) + if !seenTraceIDs[traceID] { + seenTraceIDs[traceID] = true + traceIDs = append(traceIDs, traceID) + } } } diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index a498bc9c..5baf6ba2 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -244,7 +244,15 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, writeEv if tv.valueType != pbv1.ValueTypeInt64 && tv.valueType != pbv1.ValueTypeTimestamp { return fmt.Errorf("unsupported tag value type: %s", tv.tag) } - key := req.Tags[tagIdx].GetInt().GetValue() + + var key int64 + if tv.valueType == pbv1.ValueTypeTimestamp { + // For timestamp tags, get the unix nano timestamp as the key + key = req.Tags[tagIdx].GetTimestamp().AsTime().UnixNano() + } else { + // For int64 tags, get the int value as the key + key = req.Tags[tagIdx].GetInt().GetValue() + } entityValues := make([]*modelv1.TagValue, 0, len(indexRule.Tags)) for _, tagName := range indexRule.Tags {
