This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 898681f63965b333e6eaaa7051ff3742c4483651 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Aug 27 08:45:07 2025 +0800 Refactor mock components and query result structure: Rearrange fields in mockQuerierResult and mockSIDXQueryResult for improved clarity. Update queryResult struct to include request field and optimize data handling with buffer reuse for decompression operations. Ensure efficient reading and processing of metadata and data payloads. --- banyand/internal/sidx/mock_components.go | 2 +- banyand/internal/sidx/mock_sidx.go | 2 +- banyand/internal/sidx/query_result.go | 68 ++++++++++++++++++++++---------- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/banyand/internal/sidx/mock_components.go b/banyand/internal/sidx/mock_components.go index 729dfa8e..2e9894e4 100644 --- a/banyand/internal/sidx/mock_components.go +++ b/banyand/internal/sidx/mock_components.go @@ -307,8 +307,8 @@ func (mq *MockQuerier) SetErrorRate(rate int) { // mockQuerierResult implements QueryResult for the mock querier. type mockQuerierResult struct { - request QueryRequest elements []WriteRequest + request QueryRequest position int finished bool } diff --git a/banyand/internal/sidx/mock_sidx.go b/banyand/internal/sidx/mock_sidx.go index 00bb9713..51b39df8 100644 --- a/banyand/internal/sidx/mock_sidx.go +++ b/banyand/internal/sidx/mock_sidx.go @@ -338,9 +338,9 @@ func (m *MockSIDX) updateMemoryUsageLocked() { // mockSIDXQueryResult implements QueryResult for the mock implementation. type mockSIDXQueryResult struct { - request QueryRequest stats *Stats elements []mockElement + request QueryRequest position int finished bool } diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index c5e5d10f..d7789428 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -26,9 +26,11 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) @@ -36,7 +38,6 @@ import ( // queryResult implements QueryResult interface with worker pool pattern. // Following the tsResult architecture from the stream module. type queryResult struct { - request QueryRequest ctx context.Context pm protector.Memory snapshot *snapshot @@ -45,6 +46,7 @@ type queryResult struct { tagsToLoad map[string]struct{} parts []*part shards []*QueryResponse + request QueryRequest asc bool released bool } @@ -177,13 +179,19 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata } // Read and decompress user keys (always needed) - compressedKeysBuf := make([]byte, bm.keysBlock.size) - _, err := p.keys.Read(int64(bm.keysBlock.offset), compressedKeysBuf) - if err != nil { - return false + bb := bigValuePool.Get() + if bb == nil { + bb = &bytes.Buffer{} } + defer func() { + bb.Buf = bb.Buf[:0] + bigValuePool.Put(bb) + }() - keysBuf, err := zstd.Decompress(nil, compressedKeysBuf) + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(bm.keysBlock.size)) + fs.MustReadData(p.keys, int64(bm.keysBlock.offset), bb.Buf) + + keysBuf, err := zstd.Decompress(nil, bb.Buf) if err != nil { return false } @@ -195,13 +203,19 @@ func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata } // Read and decompress data payloads (always needed) - compressedDataBuf := make([]byte, bm.dataBlock.size) - _, err = p.data.Read(int64(bm.dataBlock.offset), compressedDataBuf) - if err != nil { - return false + bb2 := bigValuePool.Get() + if bb2 == nil { + bb2 = &bytes.Buffer{} } + defer func() { + bb2.Buf = bb2.Buf[:0] + bigValuePool.Put(bb2) + }() - dataBuf, err := zstd.Decompress(nil, compressedDataBuf) + bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(bm.dataBlock.size)) + fs.MustReadData(p.data, int64(bm.dataBlock.offset), bb2.Buf) + + dataBuf, err := zstd.Decompress(nil, bb2.Buf) if err != nil { return false } @@ -262,26 +276,38 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p *part, tagName string, tag } // Read tag metadata - tmData := make([]byte, tagBlockInfo.size) - _, err := tmReader.Read(int64(tagBlockInfo.offset), tmData) - if err != nil { - return false + bb := bigValuePool.Get() + if bb == nil { + bb = &bytes.Buffer{} } + defer func() { + bb.Buf = bb.Buf[:0] + bigValuePool.Put(bb) + }() + + bb.Buf = bytes.ResizeOver(bb.Buf[:0], int(tagBlockInfo.size)) + fs.MustReadData(tmReader, int64(tagBlockInfo.offset), bb.Buf) - tm, err := unmarshalTagMetadata(tmData) + tm, err := unmarshalTagMetadata(bb.Buf) if err != nil { return false } defer releaseTagMetadata(tm) // Read and decompress tag data - tdData := make([]byte, tm.dataBlock.size) - _, err = tdReader.Read(int64(tm.dataBlock.offset), tdData) - if err != nil { - return false + bb2 := bigValuePool.Get() + if bb2 == nil { + bb2 = &bytes.Buffer{} } + defer func() { + bb2.Buf = bb2.Buf[:0] + bigValuePool.Put(bb2) + }() + + bb2.Buf = bytes.ResizeOver(bb2.Buf[:0], int(tm.dataBlock.size)) + fs.MustReadData(tdReader, int64(tm.dataBlock.offset), bb2.Buf) - decompressedData, err := zstd.Decompress(nil, tdData) + decompressedData, err := zstd.Decompress(nil, bb2.Buf) if err != nil { return false }
