This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 439473a92f0a7918fcb6ab205ea377f444d34da5
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Aug 25 16:32:34 2025 +0800

    Enhance query result processing: Introduce shared tag loading optimization 
in queryResult struct to improve performance. Implement heap-based merging for 
QueryResponse shards, supporting both ascending and descending order merges. 
Update loadBlockData and loadTagData methods for efficient tag data retrieval 
and processing. Ensure robust handling of compressed data and metadata during 
block loading.
---
 banyand/internal/sidx/query_result.go | 345 +++++++++++++++++++++++++++++++---
 1 file changed, 314 insertions(+), 31 deletions(-)

diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index 6c3ea5a9..951d73f3 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -18,6 +18,7 @@
 package sidx
 
 import (
+       "container/heap"
        "context"
        "sync"
 
@@ -26,7 +27,10 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 // queryResult implements QueryResult interface with worker pool pattern.
@@ -47,7 +51,8 @@ type queryResult struct {
        l      *logger.Logger   // Logger instance
 
        // Query parameters (derived from request)
-       asc bool // Ordering direction
+       asc        bool                // Ordering direction
+       tagsToLoad map[string]struct{} // Shared map of tags to load across 
workers
 
        // State management
        released bool
@@ -67,6 +72,19 @@ func (qr *queryResult) runBlockScanner() *QueryResponse {
        workerSize := cgroups.CPUs()
        batchCh := make(chan *blockScanResultBatch, workerSize)
 
+       // Determine which tags to load once for all workers (shared 
optimization)
+       if qr.tagsToLoad == nil {
+               qr.tagsToLoad = make(map[string]struct{})
+               if len(qr.request.TagProjection) > 0 {
+                       // Load only projected tags
+                       for _, proj := range qr.request.TagProjection {
+                               for _, tagName := range proj.Names {
+                                       qr.tagsToLoad[tagName] = struct{}{}
+                               }
+                       }
+               }
+       }
+
        // Initialize worker result shards
        if qr.shards == nil {
                qr.shards = make([]*QueryResponse, workerSize)
@@ -150,14 +168,156 @@ func (qr *queryResult) loadAndProcessBlock(tmpBlock 
*block, bs blockScanResult,
 }
 
 // loadBlockData loads block data from part using block metadata.
+// Uses blockCursor pattern for optimal performance with selective tag loading.
 func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm 
*blockMetadata) bool {
-       // TODO: Implement actual block loading from part
-       // This should use the existing block reader functionality
-       _ = tmpBlock // TODO: use when implementing block loading
-       _ = p        // TODO: use when implementing block loading
-       _ = bm       // TODO: use when implementing block loading
-       // For now, return false to avoid processing
-       return false
+       tmpBlock.reset()
+
+       // Early exit if no data
+       if bm.count == 0 {
+               return false
+       }
+
+       // Read and decompress user keys (always needed)
+       compressedKeysBuf := make([]byte, bm.keysBlock.size)
+       _, err := p.keys.Read(int64(bm.keysBlock.offset), compressedKeysBuf)
+       if err != nil {
+               return false
+       }
+
+       keysBuf, err := zstd.Decompress(nil, compressedKeysBuf)
+       if err != nil {
+               return false
+       }
+
+       // Decode user keys
+       tmpBlock.userKeys, err = 
encoding.BytesToInt64List(tmpBlock.userKeys[:0], keysBuf, bm.keysEncodeType, 
bm.minKey, int(bm.count))
+       if err != nil {
+               return false
+       }
+
+       // Read and decompress data payloads (always needed)
+       compressedDataBuf := make([]byte, bm.dataBlock.size)
+       _, err = p.data.Read(int64(bm.dataBlock.offset), compressedDataBuf)
+       if err != nil {
+               return false
+       }
+
+       dataBuf, err := zstd.Decompress(nil, compressedDataBuf)
+       if err != nil {
+               return false
+       }
+
+       // Decode data payloads
+       decoder := &encoding.BytesBlockDecoder{}
+       tmpBlock.data, err = decoder.Decode(tmpBlock.data[:0], dataBuf, 
bm.count)
+       if err != nil {
+               return false
+       }
+
+       // Use shared tagsToLoad map, or determine available tags for this block
+       var tagsToLoad map[string]struct{}
+       if len(qr.tagsToLoad) > 0 {
+               // Use the shared projected tags map
+               tagsToLoad = qr.tagsToLoad
+       } else {
+               // Load all available tags for this specific block
+               tagsToLoad = make(map[string]struct{})
+               for tagName := range bm.tagsBlocks {
+                       tagsToLoad[tagName] = struct{}{}
+               }
+       }
+
+       // Early exit if no tags to load
+       if len(tagsToLoad) == 0 {
+               return len(tmpBlock.userKeys) > 0
+       }
+
+       // Load tag data for selected tags only
+       for tagName := range tagsToLoad {
+               tagBlockInfo, exists := bm.tagsBlocks[tagName]
+               if !exists {
+                       continue // Skip missing tags
+               }
+
+               if !qr.loadTagData(tmpBlock, p, tagName, &tagBlockInfo, 
int(bm.count)) {
+                       // Continue loading other tags even if one fails
+                       continue
+               }
+       }
+
+       return len(tmpBlock.userKeys) > 0
+}
+
+// loadTagData loads data for a specific tag, following the pattern from 
readBlockTags.
+func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, 
tagBlockInfo *dataBlock, count int) bool {
+       // Get tag metadata reader
+       tmReader, tmExists := p.getTagMetadataReader(tagName)
+       if !tmExists {
+               return false
+       }
+
+       // Get tag data reader
+       tdReader, tdExists := p.getTagDataReader(tagName)
+       if !tdExists {
+               return false
+       }
+
+       // Read tag metadata
+       tmData := make([]byte, tagBlockInfo.size)
+       _, err := tmReader.Read(int64(tagBlockInfo.offset), tmData)
+       if err != nil {
+               return false
+       }
+
+       tm, err := unmarshalTagMetadata(tmData)
+       if err != nil {
+               return false
+       }
+       defer releaseTagMetadata(tm)
+
+       // Read and decompress tag data
+       tdData := make([]byte, tm.dataBlock.size)
+       _, err = tdReader.Read(int64(tm.dataBlock.offset), tdData)
+       if err != nil {
+               return false
+       }
+
+       decompressedData, err := zstd.Decompress(nil, tdData)
+       if err != nil {
+               return false
+       }
+
+       // Decode tag values
+       tagValues, err := DecodeTagValues(decompressedData, tm.valueType, count)
+       if err != nil {
+               return false
+       }
+
+       // Create tag data structure and populate block
+       td := generateTagData()
+       td.name = tagName
+       td.valueType = tm.valueType
+       td.indexed = tm.indexed
+       td.values = tagValues
+
+       // Set min/max for int64 tags
+       if tm.valueType == pbv1.ValueTypeInt64 {
+               td.min = tm.min
+               td.max = tm.max
+       }
+
+       // Create bloom filter for indexed tags if needed
+       if tm.indexed {
+               td.filter = generateBloomFilter(count)
+               for _, value := range tagValues {
+                       if value != nil {
+                               td.filter.Add(value)
+                       }
+               }
+       }
+
+       tmpBlock.tags[tagName] = td
+       return true
 }
 
 // convertBlockToResponse converts SIDX block data to QueryResponse format.
@@ -257,42 +417,165 @@ func (qr *queryResult) Release() {
 
 // mergeQueryResponseShardsAsc merges multiple QueryResponse shards in 
ascending order.
 func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
-       result := &QueryResponse{
-               Keys: make([]int64, 0, maxElements),
-               Data: make([][]byte, 0, maxElements),
-               Tags: make([][]Tag, 0, maxElements),
-               SIDs: make([]common.SeriesID, 0, maxElements),
-       }
+       // Create heap for ascending merge
+       qrh := &QueryResponseHeap{asc: true}
 
-       // Simple concatenation for now - TODO: implement proper merge sort
+       // Initialize cursors for non-empty shards
        for _, shard := range shards {
-               for i := 0; i < shard.Len() && result.Len() < maxElements; i++ {
-                       result.Keys = append(result.Keys, shard.Keys[i])
-                       result.Data = append(result.Data, shard.Data[i])
-                       result.Tags = append(result.Tags, shard.Tags[i])
-                       result.SIDs = append(result.SIDs, shard.SIDs[i])
+               if shard.Len() > 0 {
+                       qrh.cursors = append(qrh.cursors, &QueryResponseCursor{
+                               response: shard,
+                               idx:      0,
+                       })
                }
        }
 
+       if len(qrh.cursors) == 0 {
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }
+       }
+
+       // Initialize heap
+       heap.Init(qrh)
+
+       // Perform heap-based merge
+       result := qrh.mergeWithHeap(maxElements)
+
+       // Reset heap
+       qrh.reset()
+
        return result
 }
 
 // mergeQueryResponseShardsDesc merges multiple QueryResponse shards in 
descending order.
 func mergeQueryResponseShardsDesc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
+       // Create heap for descending merge
+       qrh := &QueryResponseHeap{asc: false}
+
+       // Initialize cursors for non-empty shards (start from end for 
descending)
+       for _, shard := range shards {
+               if shard.Len() > 0 {
+                       qrh.cursors = append(qrh.cursors, &QueryResponseCursor{
+                               response: shard,
+                               idx:      shard.Len() - 1, // Start from last 
element for descending
+                       })
+               }
+       }
+
+       if len(qrh.cursors) == 0 {
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }
+       }
+
+       // Initialize heap
+       heap.Init(qrh)
+
+       // Perform heap-based merge
+       result := qrh.mergeWithHeap(maxElements)
+
+       // Reset heap
+       qrh.reset()
+
+       return result
+}
+
+// QueryResponseCursor wraps a QueryResponse with current iteration position.
+type QueryResponseCursor struct {
+       response *QueryResponse
+       idx      int
+}
+
+// QueryResponseHeap implements heap.Interface for merging QueryResponse 
shards.
+type QueryResponseHeap struct {
+       cursors []*QueryResponseCursor
+       asc     bool
+}
+
+func (qrh QueryResponseHeap) Len() int {
+       return len(qrh.cursors)
+}
+
+func (qrh QueryResponseHeap) Less(i, j int) bool {
+       leftKey := qrh.cursors[i].response.Keys[qrh.cursors[i].idx]
+       rightKey := qrh.cursors[j].response.Keys[qrh.cursors[j].idx]
+       if qrh.asc {
+               return leftKey < rightKey
+       }
+       return leftKey > rightKey
+}
+
+func (qrh *QueryResponseHeap) Swap(i, j int) {
+       qrh.cursors[i], qrh.cursors[j] = qrh.cursors[j], qrh.cursors[i]
+}
+
+func (qrh *QueryResponseHeap) Push(x interface{}) {
+       qrh.cursors = append(qrh.cursors, x.(*QueryResponseCursor))
+}
+
+func (qrh *QueryResponseHeap) Pop() interface{} {
+       old := qrh.cursors
+       n := len(old)
+       x := old[n-1]
+       qrh.cursors = old[0 : n-1]
+       return x
+}
+
+func (qrh *QueryResponseHeap) reset() {
+       qrh.cursors = qrh.cursors[:0]
+}
+
+// mergeWithHeap performs heap-based merge of QueryResponse shards.
+func (qrh *QueryResponseHeap) mergeWithHeap(limit int) *QueryResponse {
        result := &QueryResponse{
-               Keys: make([]int64, 0, maxElements),
-               Data: make([][]byte, 0, maxElements),
-               Tags: make([][]Tag, 0, maxElements),
-               SIDs: make([]common.SeriesID, 0, maxElements),
+               Keys: make([]int64, 0, limit),
+               Data: make([][]byte, 0, limit),
+               Tags: make([][]Tag, 0, limit),
+               SIDs: make([]common.SeriesID, 0, limit),
        }
 
-       // Simple concatenation for now - TODO: implement proper merge sort
-       for _, shard := range shards {
-               for i := 0; i < shard.Len() && result.Len() < maxElements; i++ {
-                       result.Keys = append(result.Keys, shard.Keys[i])
-                       result.Data = append(result.Data, shard.Data[i])
-                       result.Tags = append(result.Tags, shard.Tags[i])
-                       result.SIDs = append(result.SIDs, shard.SIDs[i])
+       step := -1
+       if qrh.asc {
+               step = 1
+       }
+
+       for qrh.Len() > 0 {
+               topCursor := qrh.cursors[0]
+               idx := topCursor.idx
+               resp := topCursor.response
+
+               // Copy element from top cursor
+               result.Keys = append(result.Keys, resp.Keys[idx])
+               result.Data = append(result.Data, resp.Data[idx])
+               result.Tags = append(result.Tags, resp.Tags[idx])
+               result.SIDs = append(result.SIDs, resp.SIDs[idx])
+
+               if result.Len() >= limit {
+                       break
+               }
+
+               // Advance cursor
+               topCursor.idx += step
+
+               if qrh.asc {
+                       if topCursor.idx >= resp.Len() {
+                               heap.Pop(qrh)
+                       } else {
+                               heap.Fix(qrh, 0)
+                       }
+               } else {
+                       if topCursor.idx < 0 {
+                               heap.Pop(qrh)
+                       } else {
+                               heap.Fix(qrh, 0)
+                       }
                }
        }
 

Reply via email to