This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 439473a92f0a7918fcb6ab205ea377f444d34da5 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Aug 25 16:32:34 2025 +0800 Enhance query result processing: Introduce shared tag loading optimization in queryResult struct to improve performance. Implement heap-based merging for QueryResponse shards, supporting both ascending and descending order merges. Update loadBlockData and loadTagData methods for efficient tag data retrieval and processing. Ensure robust handling of compressed data and metadata during block loading. --- banyand/internal/sidx/query_result.go | 345 +++++++++++++++++++++++++++++++--- 1 file changed, 314 insertions(+), 31 deletions(-) diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 6c3ea5a9..951d73f3 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -18,6 +18,7 @@ package sidx import ( + "container/heap" "context" "sync" @@ -26,7 +27,10 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) // queryResult implements QueryResult interface with worker pool pattern. @@ -47,7 +51,8 @@ type queryResult struct { l *logger.Logger // Logger instance // Query parameters (derived from request) - asc bool // Ordering direction + asc bool // Ordering direction + tagsToLoad map[string]struct{} // Shared map of tags to load across workers // State management released bool @@ -67,6 +72,19 @@ func (qr *queryResult) runBlockScanner() *QueryResponse { workerSize := cgroups.CPUs() batchCh := make(chan *blockScanResultBatch, workerSize) + // Determine which tags to load once for all workers (shared optimization) + if qr.tagsToLoad == nil { + qr.tagsToLoad = make(map[string]struct{}) + if len(qr.request.TagProjection) > 0 { + // Load only projected tags + for _, proj := range qr.request.TagProjection { + for _, tagName := range proj.Names { + qr.tagsToLoad[tagName] = struct{}{} + } + } + } + } + // Initialize worker result shards if qr.shards == nil { qr.shards = make([]*QueryResponse, workerSize) @@ -150,14 +168,156 @@ func (qr *queryResult) loadAndProcessBlock(tmpBlock *block, bs blockScanResult, } // loadBlockData loads block data from part using block metadata. +// Uses blockCursor pattern for optimal performance with selective tag loading. func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata) bool { - // TODO: Implement actual block loading from part - // This should use the existing block reader functionality - _ = tmpBlock // TODO: use when implementing block loading - _ = p // TODO: use when implementing block loading - _ = bm // TODO: use when implementing block loading - // For now, return false to avoid processing - return false + tmpBlock.reset() + + // Early exit if no data + if bm.count == 0 { + return false + } + + // Read and decompress user keys (always needed) + compressedKeysBuf := make([]byte, bm.keysBlock.size) + _, err := p.keys.Read(int64(bm.keysBlock.offset), compressedKeysBuf) + if err != nil { + return false + } + + keysBuf, err := zstd.Decompress(nil, compressedKeysBuf) + if err != nil { + return false + } + + // Decode user keys + tmpBlock.userKeys, err = encoding.BytesToInt64List(tmpBlock.userKeys[:0], keysBuf, bm.keysEncodeType, bm.minKey, int(bm.count)) + if err != nil { + return false + } + + // Read and decompress data payloads (always needed) + compressedDataBuf := make([]byte, bm.dataBlock.size) + _, err = p.data.Read(int64(bm.dataBlock.offset), compressedDataBuf) + if err != nil { + return false + } + + dataBuf, err := zstd.Decompress(nil, compressedDataBuf) + if err != nil { + return false + } + + // Decode data payloads + decoder := &encoding.BytesBlockDecoder{} + tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], dataBuf, bm.count) + if err != nil { + return false + } + + // Use shared tagsToLoad map, or determine available tags for this block + var tagsToLoad map[string]struct{} + if len(qr.tagsToLoad) > 0 { + // Use the shared projected tags map + tagsToLoad = qr.tagsToLoad + } else { + // Load all available tags for this specific block + tagsToLoad = make(map[string]struct{}) + for tagName := range bm.tagsBlocks { + tagsToLoad[tagName] = struct{}{} + } + } + + // Early exit if no tags to load + if len(tagsToLoad) == 0 { + return len(tmpBlock.userKeys) > 0 + } + + // Load tag data for selected tags only + for tagName := range tagsToLoad { + tagBlockInfo, exists := bm.tagsBlocks[tagName] + if !exists { + continue // Skip missing tags + } + + if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, int(bm.count)) { + // Continue loading other tags even if one fails + continue + } + } + + return len(tmpBlock.userKeys) > 0 +} + +// loadTagData loads data for a specific tag, following the pattern from readBlockTags. +func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tagBlockInfo *dataBlock, count int) bool { + // Get tag metadata reader + tmReader, tmExists := p.getTagMetadataReader(tagName) + if !tmExists { + return false + } + + // Get tag data reader + tdReader, tdExists := p.getTagDataReader(tagName) + if !tdExists { + return false + } + + // Read tag metadata + tmData := make([]byte, tagBlockInfo.size) + _, err := tmReader.Read(int64(tagBlockInfo.offset), tmData) + if err != nil { + return false + } + + tm, err := unmarshalTagMetadata(tmData) + if err != nil { + return false + } + defer releaseTagMetadata(tm) + + // Read and decompress tag data + tdData := make([]byte, tm.dataBlock.size) + _, err = tdReader.Read(int64(tm.dataBlock.offset), tdData) + if err != nil { + return false + } + + decompressedData, err := zstd.Decompress(nil, tdData) + if err != nil { + return false + } + + // Decode tag values + tagValues, err := DecodeTagValues(decompressedData, tm.valueType, count) + if err != nil { + return false + } + + // Create tag data structure and populate block + td := generateTagData() + td.name = tagName + td.valueType = tm.valueType + td.indexed = tm.indexed + td.values = tagValues + + // Set min/max for int64 tags + if tm.valueType == pbv1.ValueTypeInt64 { + td.min = tm.min + td.max = tm.max + } + + // Create bloom filter for indexed tags if needed + if tm.indexed { + td.filter = generateBloomFilter(count) + for _, value := range tagValues { + if value != nil { + td.filter.Add(value) + } + } + } + + tmpBlock.tags[tagName] = td + return true } // convertBlockToResponse converts SIDX block data to QueryResponse format. @@ -257,42 +417,165 @@ func (qr *queryResult) Release() { // mergeQueryResponseShardsAsc merges multiple QueryResponse shards in ascending order. func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) *QueryResponse { - result := &QueryResponse{ - Keys: make([]int64, 0, maxElements), - Data: make([][]byte, 0, maxElements), - Tags: make([][]Tag, 0, maxElements), - SIDs: make([]common.SeriesID, 0, maxElements), - } + // Create heap for ascending merge + qrh := &QueryResponseHeap{asc: true} - // Simple concatenation for now - TODO: implement proper merge sort + // Initialize cursors for non-empty shards for _, shard := range shards { - for i := 0; i < shard.Len() && result.Len() < maxElements; i++ { - result.Keys = append(result.Keys, shard.Keys[i]) - result.Data = append(result.Data, shard.Data[i]) - result.Tags = append(result.Tags, shard.Tags[i]) - result.SIDs = append(result.SIDs, shard.SIDs[i]) + if shard.Len() > 0 { + qrh.cursors = append(qrh.cursors, &QueryResponseCursor{ + response: shard, + idx: 0, + }) } } + if len(qrh.cursors) == 0 { + return &QueryResponse{ + Keys: make([]int64, 0), + Data: make([][]byte, 0), + Tags: make([][]Tag, 0), + SIDs: make([]common.SeriesID, 0), + } + } + + // Initialize heap + heap.Init(qrh) + + // Perform heap-based merge + result := qrh.mergeWithHeap(maxElements) + + // Reset heap + qrh.reset() + return result } // mergeQueryResponseShardsDesc merges multiple QueryResponse shards in descending order. func mergeQueryResponseShardsDesc(shards []*QueryResponse, maxElements int) *QueryResponse { + // Create heap for descending merge + qrh := &QueryResponseHeap{asc: false} + + // Initialize cursors for non-empty shards (start from end for descending) + for _, shard := range shards { + if shard.Len() > 0 { + qrh.cursors = append(qrh.cursors, &QueryResponseCursor{ + response: shard, + idx: shard.Len() - 1, // Start from last element for descending + }) + } + } + + if len(qrh.cursors) == 0 { + return &QueryResponse{ + Keys: make([]int64, 0), + Data: make([][]byte, 0), + Tags: make([][]Tag, 0), + SIDs: make([]common.SeriesID, 0), + } + } + + // Initialize heap + heap.Init(qrh) + + // Perform heap-based merge + result := qrh.mergeWithHeap(maxElements) + + // Reset heap + qrh.reset() + + return result +} + +// QueryResponseCursor wraps a QueryResponse with current iteration position. +type QueryResponseCursor struct { + response *QueryResponse + idx int +} + +// QueryResponseHeap implements heap.Interface for merging QueryResponse shards. +type QueryResponseHeap struct { + cursors []*QueryResponseCursor + asc bool +} + +func (qrh QueryResponseHeap) Len() int { + return len(qrh.cursors) +} + +func (qrh QueryResponseHeap) Less(i, j int) bool { + leftKey := qrh.cursors[i].response.Keys[qrh.cursors[i].idx] + rightKey := qrh.cursors[j].response.Keys[qrh.cursors[j].idx] + if qrh.asc { + return leftKey < rightKey + } + return leftKey > rightKey +} + +func (qrh *QueryResponseHeap) Swap(i, j int) { + qrh.cursors[i], qrh.cursors[j] = qrh.cursors[j], qrh.cursors[i] +} + +func (qrh *QueryResponseHeap) Push(x interface{}) { + qrh.cursors = append(qrh.cursors, x.(*QueryResponseCursor)) +} + +func (qrh *QueryResponseHeap) Pop() interface{} { + old := qrh.cursors + n := len(old) + x := old[n-1] + qrh.cursors = old[0 : n-1] + return x +} + +func (qrh *QueryResponseHeap) reset() { + qrh.cursors = qrh.cursors[:0] +} + +// mergeWithHeap performs heap-based merge of QueryResponse shards. +func (qrh *QueryResponseHeap) mergeWithHeap(limit int) *QueryResponse { result := &QueryResponse{ - Keys: make([]int64, 0, maxElements), - Data: make([][]byte, 0, maxElements), - Tags: make([][]Tag, 0, maxElements), - SIDs: make([]common.SeriesID, 0, maxElements), + Keys: make([]int64, 0, limit), + Data: make([][]byte, 0, limit), + Tags: make([][]Tag, 0, limit), + SIDs: make([]common.SeriesID, 0, limit), } - // Simple concatenation for now - TODO: implement proper merge sort - for _, shard := range shards { - for i := 0; i < shard.Len() && result.Len() < maxElements; i++ { - result.Keys = append(result.Keys, shard.Keys[i]) - result.Data = append(result.Data, shard.Data[i]) - result.Tags = append(result.Tags, shard.Tags[i]) - result.SIDs = append(result.SIDs, shard.SIDs[i]) + step := -1 + if qrh.asc { + step = 1 + } + + for qrh.Len() > 0 { + topCursor := qrh.cursors[0] + idx := topCursor.idx + resp := topCursor.response + + // Copy element from top cursor + result.Keys = append(result.Keys, resp.Keys[idx]) + result.Data = append(result.Data, resp.Data[idx]) + result.Tags = append(result.Tags, resp.Tags[idx]) + result.SIDs = append(result.SIDs, resp.SIDs[idx]) + + if result.Len() >= limit { + break + } + + // Advance cursor + topCursor.idx += step + + if qrh.asc { + if topCursor.idx >= resp.Len() { + heap.Pop(qrh) + } else { + heap.Fix(qrh, 0) + } + } else { + if topCursor.idx < 0 { + heap.Pop(qrh) + } else { + heap.Fix(qrh, 0) + } } }
