This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 0dd4a426e841a7155894dfc95553729def920c3e
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Aug 30 16:25:43 2025 +0800

    Refactor query result handling by removing the QueryResult interface and 
integrating its logic directly into the sidx query process. This change 
simplifies the code by eliminating unnecessary abstractions and streamlining 
the worker pool pattern for block scanning. Additionally, the queryResult 
struct has been modified to focus solely on internal processing, enhancing 
maintainability and performance.
---
 banyand/internal/sidx/interfaces.go        |   14 +-
 banyand/internal/sidx/query_result.go      |  149 +--
 banyand/internal/sidx/query_result_test.go | 1426 ----------------------------
 banyand/internal/sidx/sidx.go              |  152 ++-
 4 files changed, 129 insertions(+), 1612 deletions(-)

diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index 91b2bb61..f20362ec 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -99,18 +99,6 @@ type Querier interface {
        Query(ctx context.Context, req QueryRequest) (*QueryResponse, error)
 }
 
-// QueryResult provides iterator-like access to query results, following 
BanyanDB pattern.
-type QueryResult interface {
-       // Pull returns the next batch of query results.
-       // Returns nil when no more results are available.
-       // Check QueryResponse.Error for execution errors during iteration.
-       Pull() *QueryResponse
-
-       // Release releases resources associated with the query result.
-       // Must be called when done with the QueryResult to prevent resource 
leaks.
-       Release()
-}
-
 // WriteRequest contains data for a single write operation within a batch.
 // The user provides the ordering key as an int64 value that sidx treats 
opaquely.
 type WriteRequest struct {
@@ -523,7 +511,7 @@ func (qr *QueryRequest) CopyFrom(other *QueryRequest) {
 //             return s.writer.Write(ctx, reqs)
 //     }
 //
-//     func (s *sidxImpl) Query(ctx context.Context, req QueryRequest) 
(QueryResult, error) {
+//     func (s *sidxImpl) Query(ctx context.Context, req QueryRequest) 
(*QueryResponse, error) {
 //             return s.querier.Query(ctx, req)
 //     }
 //
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index fa3ddc15..078a00de 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -19,16 +19,11 @@ package sidx
 
 import (
        "container/heap"
-       "context"
-       "sync"
-
-       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
-       "github.com/apache/skywalking-banyandb/pkg/cgroups"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -36,116 +31,12 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-// queryResult implements QueryResult interface with worker pool pattern.
-// Following the tsResult architecture from the stream module.
+// queryResult is used internally for processing logic only.
 type queryResult struct {
-       ctx        context.Context
        pm         protector.Memory
-       snapshot   *snapshot
-       bs         *blockScanner
        l          *logger.Logger
        tagsToLoad map[string]struct{}
-       shards     []*QueryResponse
        request    QueryRequest
-       asc        bool
-       released   bool
-}
-
-// Pull returns the next batch of query results using parallel worker 
processing.
-func (qr *queryResult) Pull() *QueryResponse {
-       if qr.released || qr.bs == nil {
-               return nil
-       }
-
-       return qr.runBlockScanner()
-}
-
-// runBlockScanner coordinates the worker pool with block scanner following 
tsResult pattern.
-func (qr *queryResult) runBlockScanner() *QueryResponse {
-       workerSize := cgroups.CPUs()
-       batchCh := make(chan *blockScanResultBatch, workerSize)
-
-       // Determine which tags to load once for all workers (shared 
optimization)
-       if qr.tagsToLoad == nil {
-               qr.tagsToLoad = make(map[string]struct{})
-               if len(qr.request.TagProjection) > 0 {
-                       // Load only projected tags
-                       for _, proj := range qr.request.TagProjection {
-                               for _, tagName := range proj.Names {
-                                       qr.tagsToLoad[tagName] = struct{}{}
-                               }
-                       }
-               }
-       }
-
-       // Initialize worker result shards
-       if qr.shards == nil {
-               qr.shards = make([]*QueryResponse, workerSize)
-               for i := range qr.shards {
-                       qr.shards[i] = &QueryResponse{
-                               Keys: make([]int64, 0),
-                               Data: make([][]byte, 0),
-                               Tags: make([][]Tag, 0),
-                               SIDs: make([]common.SeriesID, 0),
-                       }
-               }
-       } else {
-               // Reset existing shards
-               for i := range qr.shards {
-                       qr.shards[i].Reset()
-               }
-       }
-
-       // Launch worker pool
-       var workerWg sync.WaitGroup
-       workerWg.Add(workerSize)
-
-       for i := range workerSize {
-               go func(workerID int) {
-                       defer workerWg.Done()
-                       qr.processWorkerBatches(workerID, batchCh)
-               }(i)
-       }
-
-       // Start block scanning
-       go func() {
-               qr.bs.scan(qr.ctx, batchCh)
-               close(batchCh)
-       }()
-
-       workerWg.Wait()
-
-       // Check for completion
-       if len(qr.bs.parts) == 0 {
-               qr.bs.close()
-               qr.bs = nil
-       }
-
-       // Merge results from all workers
-       return qr.mergeWorkerResults()
-}
-
-// processWorkerBatches processes batches in a worker goroutine.
-func (qr *queryResult) processWorkerBatches(workerID int, batchCh chan 
*blockScanResultBatch) {
-       tmpBlock := generateBlock()
-       defer releaseBlock(tmpBlock)
-
-       for batch := range batchCh {
-               if batch.err != nil {
-                       qr.shards[workerID].Error = batch.err
-                       releaseBlockScanResultBatch(batch)
-                       continue
-               }
-
-               for _, bs := range batch.bss {
-                       if !qr.loadAndProcessBlock(tmpBlock, bs, 
qr.shards[workerID]) {
-                               // If load fails, continue with next block 
rather than stopping
-                               continue
-                       }
-               }
-
-               releaseBlockScanResultBatch(batch)
-       }
 }
 
 // loadAndProcessBlock loads a block from part and processes it into 
QueryResponse format.
@@ -400,44 +291,6 @@ func (qr *queryResult) extractElementTags(block *block, 
elemIndex int) []Tag {
        return elementTags
 }
 
-// mergeWorkerResults merges results from all worker shards with error 
handling.
-func (qr *queryResult) mergeWorkerResults() *QueryResponse {
-       // Check for errors first
-       var err error
-       for i := range qr.shards {
-               if qr.shards[i].Error != nil {
-                       err = multierr.Append(err, qr.shards[i].Error)
-               }
-       }
-
-       if err != nil {
-               return &QueryResponse{Error: err}
-       }
-
-       // Merge results with ordering from request
-       if qr.asc {
-               return mergeQueryResponseShardsAsc(qr.shards, 
qr.request.MaxElementSize)
-       }
-       return mergeQueryResponseShardsDesc(qr.shards, 
qr.request.MaxElementSize)
-}
-
-// Release releases resources associated with the query result.
-func (qr *queryResult) Release() {
-       if qr.released {
-               return
-       }
-       qr.released = true
-
-       if qr.bs != nil {
-               qr.bs.close()
-       }
-
-       if qr.snapshot != nil {
-               qr.snapshot.decRef()
-               qr.snapshot = nil
-       }
-}
-
 // mergeQueryResponseShardsAsc merges multiple QueryResponse shards in 
ascending order.
 func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
        // Create heap for ascending merge
diff --git a/banyand/internal/sidx/query_result_test.go 
b/banyand/internal/sidx/query_result_test.go
deleted file mode 100644
index 5d94628c..00000000
--- a/banyand/internal/sidx/query_result_test.go
+++ /dev/null
@@ -1,1426 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package sidx
-
-import (
-       "bytes"
-       "container/heap"
-       "context"
-       "testing"
-
-       "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/banyand/internal/test"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/query/model"
-)
-
-func TestQueryResponseHeap_BasicOperations(t *testing.T) {
-       tests := []struct {
-               name string
-               asc  bool
-       }{
-               {"ascending", true},
-               {"descending", false},
-       }
-
-       for _, tt := range tests {
-               t.Run(tt.name, func(t *testing.T) {
-                       qrh := &QueryResponseHeap{asc: tt.asc}
-
-                       if qrh.Len() != 0 {
-                               t.Error("New heap should be empty")
-                       }
-
-                       response1 := &QueryResponse{
-                               Keys: []int64{10, 20},
-                               Data: [][]byte{[]byte("data1"), 
[]byte("data2")},
-                               SIDs: []common.SeriesID{1, 2},
-                       }
-                       response2 := &QueryResponse{
-                               Keys: []int64{5, 15},
-                               Data: [][]byte{[]byte("data3"), 
[]byte("data4")},
-                               SIDs: []common.SeriesID{3, 4},
-                       }
-
-                       cursor1 := &QueryResponseCursor{response: response1, 
idx: 0}
-                       cursor2 := &QueryResponseCursor{response: response2, 
idx: 0}
-
-                       qrh.cursors = append(qrh.cursors, cursor1, cursor2)
-
-                       if qrh.Len() != 2 {
-                               t.Errorf("Expected heap length 2, got %d", 
qrh.Len())
-                       }
-
-                       heap.Init(qrh)
-
-                       if tt.asc {
-                               if !qrh.Less(0, 1) && 
qrh.cursors[0].response.Keys[0] < qrh.cursors[1].response.Keys[0] {
-                                       t.Error("Ascending heap ordering is 
incorrect")
-                               }
-                       } else {
-                               if !qrh.Less(0, 1) && 
qrh.cursors[0].response.Keys[0] > qrh.cursors[1].response.Keys[0] {
-                                       t.Error("Descending heap ordering is 
incorrect")
-                               }
-                       }
-
-                       originalCursor0 := qrh.cursors[0]
-                       qrh.Swap(0, 1)
-                       if qrh.cursors[0] == originalCursor0 {
-                               t.Error("Swap operation failed")
-                       }
-
-                       qrh.reset()
-                       if qrh.Len() != 0 {
-                               t.Error("Reset should clear all cursors")
-                       }
-               })
-       }
-}
-
-func TestQueryResponseHeap_PushPop(t *testing.T) {
-       qrh := &QueryResponseHeap{asc: true}
-
-       response1 := &QueryResponse{
-               Keys: []int64{10},
-               Data: [][]byte{[]byte("data1")},
-               SIDs: []common.SeriesID{1},
-       }
-       response2 := &QueryResponse{
-               Keys: []int64{5},
-               Data: [][]byte{[]byte("data2")},
-               SIDs: []common.SeriesID{2},
-       }
-       response3 := &QueryResponse{
-               Keys: []int64{15},
-               Data: [][]byte{[]byte("data3")},
-               SIDs: []common.SeriesID{3},
-       }
-
-       cursor1 := &QueryResponseCursor{response: response1, idx: 0}
-       cursor2 := &QueryResponseCursor{response: response2, idx: 0}
-       cursor3 := &QueryResponseCursor{response: response3, idx: 0}
-
-       heap.Init(qrh)
-       heap.Push(qrh, cursor1)
-       heap.Push(qrh, cursor2)
-       heap.Push(qrh, cursor3)
-
-       if qrh.Len() != 3 {
-               t.Errorf("Expected heap length 3, got %d", qrh.Len())
-       }
-
-       top := heap.Pop(qrh).(*QueryResponseCursor)
-       if top.response.Keys[0] != 5 {
-               t.Errorf("Expected top element key 5, got %d", 
top.response.Keys[0])
-       }
-
-       if qrh.Len() != 2 {
-               t.Errorf("Expected heap length 2 after pop, got %d", qrh.Len())
-       }
-
-       top = heap.Pop(qrh).(*QueryResponseCursor)
-       if top.response.Keys[0] != 10 {
-               t.Errorf("Expected second element key 10, got %d", 
top.response.Keys[0])
-       }
-
-       top = heap.Pop(qrh).(*QueryResponseCursor)
-       if top.response.Keys[0] != 15 {
-               t.Errorf("Expected third element key 15, got %d", 
top.response.Keys[0])
-       }
-
-       if qrh.Len() != 0 {
-               t.Error("Heap should be empty after all pops")
-       }
-}
-
-func TestQueryResponseHeap_MergeWithHeapAscending(t *testing.T) {
-       response1 := &QueryResponse{
-               Keys: []int64{1, 5, 9},
-               Data: [][]byte{[]byte("a1"), []byte("a5"), []byte("a9")},
-               Tags: [][]Tag{
-                       {{Name: "tag1", Value: []byte("val1"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val5"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val9"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{1, 1, 1},
-       }
-       response2 := &QueryResponse{
-               Keys: []int64{2, 6, 10},
-               Data: [][]byte{[]byte("b2"), []byte("b6"), []byte("b10")},
-               Tags: [][]Tag{
-                       {{Name: "tag2", Value: []byte("val2"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val6"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val10"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{2, 2, 2},
-       }
-       response3 := &QueryResponse{
-               Keys: []int64{3, 7},
-               Data: [][]byte{[]byte("c3"), []byte("c7")},
-               Tags: [][]Tag{
-                       {{Name: "tag3", Value: []byte("val3"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag3", Value: []byte("val7"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{3, 3},
-       }
-
-       qrh := &QueryResponseHeap{asc: true}
-       qrh.cursors = []*QueryResponseCursor{
-               {response: response1, idx: 0},
-               {response: response2, idx: 0},
-               {response: response3, idx: 0},
-       }
-
-       heap.Init(qrh)
-
-       result := qrh.mergeWithHeap(10)
-
-       expectedKeys := []int64{1, 2, 3, 5, 6, 7, 9, 10}
-       if len(result.Keys) != len(expectedKeys) {
-               t.Fatalf("Expected %d keys, got %d", len(expectedKeys), 
len(result.Keys))
-       }
-
-       for i, key := range expectedKeys {
-               if result.Keys[i] != key {
-                       t.Errorf("At position %d: expected key %d, got %d", i, 
key, result.Keys[i])
-               }
-       }
-
-       if len(result.Data) != len(result.Keys) {
-               t.Error("Data length should match keys length")
-       }
-       if len(result.Tags) != len(result.Keys) {
-               t.Error("Tags length should match keys length")
-       }
-       if len(result.SIDs) != len(result.Keys) {
-               t.Error("SIDs length should match keys length")
-       }
-
-       if string(result.Data[0]) != "a1" {
-               t.Errorf("Expected first data 'a1', got '%s'", 
string(result.Data[0]))
-       }
-       if string(result.Data[1]) != "b2" {
-               t.Errorf("Expected second data 'b2', got '%s'", 
string(result.Data[1]))
-       }
-}
-
-func TestQueryResponseHeap_MergeWithHeapDescending(t *testing.T) {
-       // For descending merge, we start from the end of each response
-       // The cursor starts at the last index and moves backwards (step = -1)
-       // So response1 starts at idx=2 (key=1), response2 starts at idx=2 
(key=2)
-       // Since it's descending heap, it prioritizes larger values
-       // So first element should be 2, then 1, then response moves 
backwards...
-       response1 := &QueryResponse{
-               Keys: []int64{1, 5, 9}, // Will be accessed backwards: 9, 5, 1
-               Data: [][]byte{[]byte("a1"), []byte("a5"), []byte("a9")},
-               Tags: [][]Tag{
-                       {{Name: "tag1", Value: []byte("val1"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val5"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val9"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{1, 1, 1},
-       }
-       response2 := &QueryResponse{
-               Keys: []int64{2, 6, 10}, // Will be accessed backwards: 10, 6, 2
-               Data: [][]byte{[]byte("b2"), []byte("b6"), []byte("b10")},
-               Tags: [][]Tag{
-                       {{Name: "tag2", Value: []byte("val2"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val6"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val10"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{2, 2, 2},
-       }
-
-       qrh := &QueryResponseHeap{asc: false}
-       qrh.cursors = []*QueryResponseCursor{
-               {response: response1, idx: response1.Len() - 1}, // starts at 
idx=2 (key=1)
-               {response: response2, idx: response2.Len() - 1}, // starts at 
idx=2 (key=2)
-       }
-
-       heap.Init(qrh)
-
-       result := qrh.mergeWithHeap(10)
-
-       // Since descending heap prioritizes larger values and we start from 
the end,
-       // First: cursor2 has key=10 (larger), cursor1 has key=9
-       // After taking 10, cursor2 moves to idx=1 (key=6), cursor1 still at 
idx=2 (key=9)
-       // Next: cursor1 has key=9 (larger), so take 9
-       // After taking 9, cursor1 moves to idx=1 (key=5), cursor2 still at 
idx=1 (key=6)
-       // Next: cursor2 has key=6 (larger), so take 6
-       // After taking 6, cursor2 moves to idx=0 (key=2), cursor1 still at 
idx=1 (key=5)
-       // Next: cursor1 has key=5 (larger), so take 5
-       // After taking 5, cursor1 moves to idx=0 (key=1), cursor2 still at 
idx=0 (key=2)
-       // Next: cursor2 has key=2 (larger), so take 2
-       // After taking 2, cursor2 is exhausted, cursor1 takes over with 1
-       expectedKeys := []int64{10, 9, 6, 5, 2, 1}
-       if len(result.Keys) != len(expectedKeys) {
-               t.Fatalf("Expected %d keys, got %d", len(expectedKeys), 
len(result.Keys))
-       }
-
-       for i, key := range expectedKeys {
-               if result.Keys[i] != key {
-                       t.Errorf("At position %d: expected key %d, got %d", i, 
key, result.Keys[i])
-               }
-       }
-
-       if string(result.Data[0]) != "b10" {
-               t.Errorf("Expected first data 'b10', got '%s'", 
string(result.Data[0]))
-       }
-       if string(result.Data[1]) != "a9" {
-               t.Errorf("Expected second data 'a9', got '%s'", 
string(result.Data[1]))
-       }
-}
-
-func TestQueryResponseHeap_MergeWithLimit(t *testing.T) {
-       response1 := &QueryResponse{
-               Keys: []int64{1, 3, 5},
-               Data: [][]byte{[]byte("a1"), []byte("a3"), []byte("a5")},
-               Tags: [][]Tag{
-                       {{Name: "tag1", Value: []byte("val1"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val3"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag1", Value: []byte("val5"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{1, 1, 1},
-       }
-       response2 := &QueryResponse{
-               Keys: []int64{2, 4, 6},
-               Data: [][]byte{[]byte("b2"), []byte("b4"), []byte("b6")},
-               Tags: [][]Tag{
-                       {{Name: "tag2", Value: []byte("val2"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val4"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "tag2", Value: []byte("val6"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{2, 2, 2},
-       }
-
-       qrh := &QueryResponseHeap{asc: true}
-       qrh.cursors = []*QueryResponseCursor{
-               {response: response1, idx: 0},
-               {response: response2, idx: 0},
-       }
-
-       heap.Init(qrh)
-
-       result := qrh.mergeWithHeap(3)
-
-       expectedKeys := []int64{1, 2, 3}
-       if len(result.Keys) != len(expectedKeys) {
-               t.Fatalf("Expected %d keys due to limit, got %d", 
len(expectedKeys), len(result.Keys))
-       }
-
-       for i, key := range expectedKeys {
-               if result.Keys[i] != key {
-                       t.Errorf("At position %d: expected key %d, got %d", i, 
key, result.Keys[i])
-               }
-       }
-}
-
-func TestQueryResponseHeap_EdgeCases(t *testing.T) {
-       t.Run("empty heap", func(t *testing.T) {
-               qrh := &QueryResponseHeap{asc: true}
-               result := qrh.mergeWithHeap(10)
-
-               if result.Len() != 0 {
-                       t.Error("Empty heap should produce empty result")
-               }
-       })
-
-       t.Run("single element", func(t *testing.T) {
-               response := &QueryResponse{
-                       Keys: []int64{42},
-                       Data: [][]byte{[]byte("single")},
-                       Tags: [][]Tag{{{Name: "tag", Value: []byte("value"), 
ValueType: pbv1.ValueTypeStr}}},
-                       SIDs: []common.SeriesID{1},
-               }
-
-               qrh := &QueryResponseHeap{asc: true}
-               qrh.cursors = []*QueryResponseCursor{{response: response, idx: 
0}}
-               heap.Init(qrh)
-
-               result := qrh.mergeWithHeap(10)
-
-               if result.Len() != 1 {
-                       t.Errorf("Expected 1 element, got %d", result.Len())
-               }
-               if result.Keys[0] != 42 {
-                       t.Errorf("Expected key 42, got %d", result.Keys[0])
-               }
-               if string(result.Data[0]) != "single" {
-                       t.Errorf("Expected data 'single', got '%s'", 
string(result.Data[0]))
-               }
-       })
-
-       t.Run("zero limit", func(t *testing.T) {
-               response := &QueryResponse{
-                       Keys: []int64{1, 2, 3},
-                       Data: [][]byte{[]byte("a"), []byte("b"), []byte("c")},
-                       Tags: [][]Tag{
-                               {{Name: "tag", Value: []byte("val1"), 
ValueType: pbv1.ValueTypeStr}},
-                               {{Name: "tag", Value: []byte("val2"), 
ValueType: pbv1.ValueTypeStr}},
-                               {{Name: "tag", Value: []byte("val3"), 
ValueType: pbv1.ValueTypeStr}},
-                       },
-                       SIDs: []common.SeriesID{1, 1, 1},
-               }
-
-               qrh := &QueryResponseHeap{asc: true}
-               qrh.cursors = []*QueryResponseCursor{{response: response, idx: 
0}}
-               heap.Init(qrh)
-
-               result := qrh.mergeWithHeap(0)
-
-               // With limit=0, should return all elements since limit=0 means 
no limit
-               expectedLen := 3
-               if result.Len() != expectedLen {
-                       t.Errorf("With zero limit (no limit), expected %d 
elements, got %d", expectedLen, result.Len())
-               }
-               if result.Len() > 0 && result.Keys[0] != 1 {
-                       t.Errorf("Expected first key to be 1, got %d", 
result.Keys[0])
-               }
-       })
-
-       t.Run("empty response in cursor", func(t *testing.T) {
-               normalResponse := &QueryResponse{
-                       Keys: []int64{5},
-                       Data: [][]byte{[]byte("normal")},
-                       Tags: [][]Tag{{{Name: "tag", Value: []byte("value"), 
ValueType: pbv1.ValueTypeStr}}},
-                       SIDs: []common.SeriesID{1},
-               }
-
-               qrh := &QueryResponseHeap{asc: true}
-               qrh.cursors = []*QueryResponseCursor{
-                       {response: normalResponse, idx: 0},
-               }
-
-               heap.Init(qrh)
-
-               result := qrh.mergeWithHeap(10)
-
-               if result.Len() != 1 {
-                       t.Errorf("Expected 1 element from non-empty response, 
got %d", result.Len())
-               }
-               if result.Keys[0] != 5 {
-                       t.Errorf("Expected key 5, got %d", result.Keys[0])
-               }
-       })
-}
-
-func TestMergeQueryResponseShardsAsc(t *testing.T) {
-       shard1 := &QueryResponse{
-               Keys: []int64{1, 5, 9},
-               Data: [][]byte{[]byte("s1_1"), []byte("s1_5"), []byte("s1_9")},
-               Tags: [][]Tag{
-                       {{Name: "shard1", Value: []byte("val1"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard1", Value: []byte("val5"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard1", Value: []byte("val9"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{1, 1, 1},
-       }
-       shard2 := &QueryResponse{
-               Keys: []int64{2, 6, 10},
-               Data: [][]byte{[]byte("s2_2"), []byte("s2_6"), []byte("s2_10")},
-               Tags: [][]Tag{
-                       {{Name: "shard2", Value: []byte("val2"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard2", Value: []byte("val6"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard2", Value: []byte("val10"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{2, 2, 2},
-       }
-       shard3 := &QueryResponse{
-               Keys: []int64{3, 7},
-               Data: [][]byte{[]byte("s3_3"), []byte("s3_7")},
-               Tags: [][]Tag{
-                       {{Name: "shard3", Value: []byte("val3"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard3", Value: []byte("val7"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{3, 3},
-       }
-
-       shards := []*QueryResponse{shard1, shard2, shard3}
-
-       result := mergeQueryResponseShardsAsc(shards, 100)
-
-       expectedKeys := []int64{1, 2, 3, 5, 6, 7, 9, 10}
-       if len(result.Keys) != len(expectedKeys) {
-               t.Fatalf("Expected %d keys, got %d", len(expectedKeys), 
len(result.Keys))
-       }
-
-       for i, key := range expectedKeys {
-               if result.Keys[i] != key {
-                       t.Errorf("At position %d: expected key %d, got %d", i, 
key, result.Keys[i])
-               }
-       }
-}
-
-func TestMergeQueryResponseShardsDesc(t *testing.T) {
-       shard1 := &QueryResponse{
-               Keys: []int64{9, 5, 1},
-               Data: [][]byte{[]byte("s1_9"), []byte("s1_5"), []byte("s1_1")},
-               Tags: [][]Tag{
-                       {{Name: "shard1", Value: []byte("val9"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard1", Value: []byte("val5"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard1", Value: []byte("val1"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{1, 1, 1},
-       }
-       shard2 := &QueryResponse{
-               Keys: []int64{10, 6, 2},
-               Data: [][]byte{[]byte("s2_10"), []byte("s2_6"), []byte("s2_2")},
-               Tags: [][]Tag{
-                       {{Name: "shard2", Value: []byte("val10"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard2", Value: []byte("val6"), ValueType: 
pbv1.ValueTypeStr}},
-                       {{Name: "shard2", Value: []byte("val2"), ValueType: 
pbv1.ValueTypeStr}},
-               },
-               SIDs: []common.SeriesID{2, 2, 2},
-       }
-
-       shards := []*QueryResponse{shard1, shard2}
-
-       result := mergeQueryResponseShardsDesc(shards, 100)
-
-       expectedKeys := []int64{2, 6, 10, 1, 5, 9}
-       if len(result.Keys) != len(expectedKeys) {
-               t.Fatalf("Expected %d keys, got %d", len(expectedKeys), 
len(result.Keys))
-       }
-
-       for i, key := range expectedKeys {
-               if result.Keys[i] != key {
-                       t.Errorf("At position %d: expected key %d, got %d", i, 
key, result.Keys[i])
-               }
-       }
-}
-
-func TestMergeQueryResponseShards_EmptyShards(t *testing.T) {
-       t.Run("all empty shards ascending", func(t *testing.T) {
-               emptyShards := []*QueryResponse{
-                       {Keys: []int64{}, Data: [][]byte{}, SIDs: 
[]common.SeriesID{}},
-                       {Keys: []int64{}, Data: [][]byte{}, SIDs: 
[]common.SeriesID{}},
-               }
-
-               result := mergeQueryResponseShardsAsc(emptyShards, 10)
-
-               if result.Len() != 0 {
-                       t.Error("All empty shards should produce empty result")
-               }
-       })
-
-       t.Run("all empty shards descending", func(t *testing.T) {
-               emptyShards := []*QueryResponse{
-                       {Keys: []int64{}, Data: [][]byte{}, SIDs: 
[]common.SeriesID{}},
-                       {Keys: []int64{}, Data: [][]byte{}, SIDs: 
[]common.SeriesID{}},
-               }
-
-               result := mergeQueryResponseShardsDesc(emptyShards, 10)
-
-               if result.Len() != 0 {
-                       t.Error("All empty shards should produce empty result")
-               }
-       })
-
-       t.Run("mixed empty and non-empty shards", func(t *testing.T) {
-               shards := []*QueryResponse{
-                       {Keys: []int64{}, Data: [][]byte{}, Tags: [][]Tag{}, 
SIDs: []common.SeriesID{}},
-                       {
-                               Keys: []int64{5, 10},
-                               Data: [][]byte{[]byte("a"), []byte("b")},
-                               Tags: [][]Tag{
-                                       {{Name: "mixed", Value: []byte("val5"), 
ValueType: pbv1.ValueTypeStr}},
-                                       {{Name: "mixed", Value: 
[]byte("val10"), ValueType: pbv1.ValueTypeStr}},
-                               },
-                               SIDs: []common.SeriesID{1, 1},
-                       },
-                       {Keys: []int64{}, Data: [][]byte{}, Tags: [][]Tag{}, 
SIDs: []common.SeriesID{}},
-               }
-
-               result := mergeQueryResponseShardsAsc(shards, 10)
-
-               if result.Len() != 2 {
-                       t.Errorf("Expected 2 elements from non-empty shard, got 
%d", result.Len())
-               }
-               if result.Keys[0] != 5 || result.Keys[1] != 10 {
-                       t.Error("Keys should be preserved from non-empty shard")
-               }
-       })
-}
-
-// Benchmark tests for performance validation.
-func BenchmarkQueryResponseHeap_MergeAscending(b *testing.B) {
-       shard1 := createBenchmarkResponse(1000, 1, 2)
-       shard2 := createBenchmarkResponse(1000, 2, 2)
-       shard3 := createBenchmarkResponse(1000, 3, 2)
-       shards := []*QueryResponse{shard1, shard2, shard3}
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               result := mergeQueryResponseShardsAsc(shards, 3000)
-               if result.Len() == 0 {
-                       b.Error("Benchmark should produce non-empty result")
-               }
-       }
-}
-
-func BenchmarkQueryResponseHeap_MergeDescending(b *testing.B) {
-       shard1 := createBenchmarkResponseDesc(1000, 1, 2)
-       shard2 := createBenchmarkResponseDesc(1000, 2, 2)
-       shard3 := createBenchmarkResponseDesc(1000, 3, 2)
-       shards := []*QueryResponse{shard1, shard2, shard3}
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               result := mergeQueryResponseShardsDesc(shards, 3000)
-               if result.Len() == 0 {
-                       b.Error("Benchmark should produce non-empty result")
-               }
-       }
-}
-
-func BenchmarkQueryResponseHeap_LargeMerge(b *testing.B) {
-       const numShards = 10
-       const elementsPerShard = 10000
-       shards := make([]*QueryResponse, numShards)
-
-       for i := 0; i < numShards; i++ {
-               shards[i] = createBenchmarkResponse(elementsPerShard, int64(i), 
int64(numShards))
-       }
-
-       b.ResetTimer()
-       for i := 0; i < b.N; i++ {
-               result := mergeQueryResponseShardsAsc(shards, 
elementsPerShard*numShards)
-               if result.Len() == 0 {
-                       b.Error("Benchmark should produce non-empty result")
-               }
-       }
-}
-
-func createBenchmarkResponse(size int, offset, step int64) *QueryResponse {
-       response := &QueryResponse{
-               Keys: make([]int64, size),
-               Data: make([][]byte, size),
-               Tags: make([][]Tag, size),
-               SIDs: make([]common.SeriesID, size),
-       }
-
-       for i := 0; i < size; i++ {
-               key := offset + int64(i)*step
-               response.Keys[i] = key
-               response.Data[i] = []byte("benchmark_data")
-               response.Tags[i] = []Tag{{Name: "benchmark", Value: 
[]byte("value"), ValueType: pbv1.ValueTypeStr}}
-               response.SIDs[i] = common.SeriesID(offset)
-       }
-
-       return response
-}
-
-func createBenchmarkResponseDesc(size int, offset, step int64) *QueryResponse {
-       response := &QueryResponse{
-               Keys: make([]int64, size),
-               Data: make([][]byte, size),
-               Tags: make([][]Tag, size),
-               SIDs: make([]common.SeriesID, size),
-       }
-
-       for i := 0; i < size; i++ {
-               key := offset + int64(size-1-i)*step
-               response.Keys[i] = key
-               response.Data[i] = []byte("benchmark_data")
-               response.Tags[i] = []Tag{{Name: "benchmark", Value: 
[]byte("value"), ValueType: pbv1.ValueTypeStr}}
-               response.SIDs[i] = common.SeriesID(offset)
-       }
-
-       return response
-}
-
-// Tests for queryResult struct using memPart datasets.
-
-func TestQueryResult_Pull_SingleMemPart(t *testing.T) {
-       // Create test dataset with multiple series
-       expectedElements := []testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 2, userKey: 200, data: []byte("data2"), tags: 
[]tag{{name: "service", value: []byte("service2"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 3, userKey: 300, data: []byte("data3"), tags: 
[]tag{{name: "environment", value: []byte("prod"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       }
-
-       elements := createTestElements(expectedElements)
-       defer releaseElements(elements)
-
-       // Create memory part from elements
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       // Create part from memory part
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       // Create snapshot and parts
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       // Create mock memory protector
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false}
-
-       // Create query request with default MaxElementSize (0) to test the fix
-       req := QueryRequest{
-               SeriesIDs:      []common.SeriesID{1, 2, 3},
-               MinKey:         nil,
-               MaxKey:         nil,
-               Filter:         nil,
-               Order:          nil,
-               MaxElementSize: 0, // Test with default value (0) to ensure fix 
works
-       }
-
-       // Debug: log the request parameters
-       t.Logf("Request MaxElementSize: %d", req.MaxElementSize)
-       t.Logf("Element keys: %v", []int64{100, 200, 300})
-       t.Logf("Key range filter: [%d, %d]", 50, 400)
-
-       // Create block scanner
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    400,
-               asc:       true,
-       }
-
-       // Create query result
-       qr := &queryResult{
-               request:  req,
-               ctx:      context.Background(),
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      true,
-               released: false,
-       }
-
-       // Debug: Test if the iterator can find blocks in this memPart
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-
-       it := generateIter()
-       defer releaseIter(it)
-
-       it.init(bma, parts, req.SeriesIDs, 50, 400, nil)
-
-       blockCount := 0
-       for it.nextBlock() {
-               blockCount++
-       }
-       t.Logf("Iterator found %d blocks", blockCount)
-       if it.Error() != nil {
-               t.Logf("Iterator error: %v", it.Error())
-       }
-
-       // Pull the response
-       response := qr.Pull()
-
-       // Debug: check if response is nil first
-       t.Logf("Response is nil: %v", response == nil)
-
-       // Verify response is not nil
-       if response == nil {
-               t.Fatal("QueryResult.Pull() returned nil response")
-       }
-
-       // Check for errors in response
-       if response.Error != nil {
-               t.Fatalf("QueryResult returned error: %v", response.Error)
-       }
-
-       // Verify response has the expected number of elements
-       if response.Len() != len(expectedElements) {
-               t.Fatalf("Expected %d elements in response, got %d", 
len(expectedElements), response.Len())
-       }
-
-       // Verify data consistency
-       err := response.Validate()
-       if err != nil {
-               t.Fatalf("Response validation failed: %v", err)
-       }
-
-       // Create a map of expected data by key for easy lookup
-       expectedByKey := make(map[int64]testElement)
-       for _, elem := range expectedElements {
-               expectedByKey[elem.userKey] = elem
-       }
-
-       // Verify each returned element matches expected data
-       for i := 0; i < response.Len(); i++ {
-               key := response.Keys[i]
-               data := response.Data[i]
-               sid := response.SIDs[i]
-               tags := response.Tags[i]
-
-               // Find expected element by key
-               expected, found := expectedByKey[key]
-               if !found {
-                       t.Errorf("Unexpected key %d found in response at 
position %d", key, i)
-                       continue
-               }
-
-               // Verify series ID matches
-               if sid != expected.seriesID {
-                       t.Errorf("At position %d: expected seriesID %d, got 
%d", i, expected.seriesID, sid)
-               }
-
-               // Verify data matches
-               if !bytes.Equal(data, expected.data) {
-                       t.Errorf("At position %d: expected data %s, got %s", i, 
string(expected.data), string(data))
-               }
-
-               // Verify tags match
-               if len(tags) != len(expected.tags) {
-                       t.Errorf("At position %d: expected %d tags, got %d", i, 
len(expected.tags), len(tags))
-                       continue
-               }
-
-               for j, tag := range tags {
-                       expectedTag := expected.tags[j]
-                       if tag.Name != expectedTag.name {
-                               t.Errorf("At position %d, tag %d: expected name 
%s, got %s", i, j, expectedTag.name, tag.Name)
-                       }
-                       if !bytes.Equal(tag.Value, expectedTag.value) {
-                               t.Errorf("At position %d, tag %d: expected 
value %s, got %s", i, j, string(expectedTag.value), string(tag.Value))
-                       }
-                       if tag.ValueType != expectedTag.valueType {
-                               t.Errorf("At position %d, tag %d: expected 
valueType %v, got %v", i, j, expectedTag.valueType, tag.ValueType)
-                       }
-               }
-       }
-
-       // Verify keys are within expected range and properly sorted
-       for i, key := range response.Keys {
-               if key < 50 || key > 400 {
-                       t.Errorf("Key %d at position %d is outside expected 
range [50, 400]", key, i)
-               }
-               if i > 0 && key <= response.Keys[i-1] {
-                       t.Errorf("Keys not properly sorted: key %d at position 
%d should be greater than previous key %d", key, i, response.Keys[i-1])
-               }
-       }
-
-       // Clean up
-       qr.Release()
-}
-
-func TestQueryResult_Pull_MultipleMemParts(t *testing.T) {
-       // Create multiple test datasets with expected elements
-       expectedElements := []testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1_part1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 2, userKey: 200, data: []byte("data2_part2"), tags: 
[]tag{{name: "service", value: []byte("service2"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 3, userKey: 300, data: []byte("data3_part1"), tags: 
[]tag{{name: "service", value: []byte("service3"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 4, userKey: 400, data: []byte("data4_part2"), tags: 
[]tag{{name: "environment", value: []byte("test"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       }
-
-       // Create test elements for each part
-       elements1 := createTestElements([]testElement{expectedElements[0], 
expectedElements[2]}) // data1_part1, data3_part1
-       defer releaseElements(elements1)
-
-       elements2 := createTestElements([]testElement{expectedElements[1], 
expectedElements[3]}) // data2_part2, data4_part2
-       defer releaseElements(elements2)
-
-       // Create memory parts
-       mp1 := generateMemPart()
-       defer releaseMemPart(mp1)
-       mp1.mustInitFromElements(elements1)
-
-       mp2 := generateMemPart()
-       defer releaseMemPart(mp2)
-       mp2.mustInitFromElements(elements2)
-
-       // Create parts from memory parts
-       testPart1 := openMemPart(mp1)
-       defer testPart1.close()
-       testPart2 := openMemPart(mp2)
-       defer testPart2.close()
-
-       // Create snapshot with multiple parts
-       parts := []*part{testPart1, testPart2}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       // Create mock memory protector
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false}
-
-       // Create query request targeting all series
-       req := QueryRequest{
-               SeriesIDs:      []common.SeriesID{1, 2, 3, 4},
-               MinKey:         nil,
-               MaxKey:         nil,
-               Filter:         nil,
-               Order:          nil,
-               MaxElementSize: 0, // Test with default value (0) to ensure fix 
works
-       }
-
-       // Create block scanner
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    500,
-               asc:       true,
-       }
-
-       // Create query result
-       qr := &queryResult{
-               request:  req,
-               ctx:      context.Background(),
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      true,
-               released: false,
-       }
-
-       // Pull the response
-       response := qr.Pull()
-
-       // Verify response is not nil
-       if response == nil {
-               t.Fatal("QueryResult.Pull() returned nil response")
-       }
-
-       // Check for errors in response
-       if response.Error != nil {
-               t.Fatalf("QueryResult returned error: %v", response.Error)
-       }
-
-       // Verify response has the expected number of elements
-       if response.Len() != len(expectedElements) {
-               t.Fatalf("Expected %d elements in response, got %d", 
len(expectedElements), response.Len())
-       }
-
-       // Verify data consistency
-       err := response.Validate()
-       if err != nil {
-               t.Fatalf("Response validation failed: %v", err)
-       }
-
-       // Create a map of expected data by key for easy lookup
-       expectedByKey := make(map[int64]testElement)
-       for _, elem := range expectedElements {
-               expectedByKey[elem.userKey] = elem
-       }
-
-       // Verify each returned element matches expected data
-       for i := 0; i < response.Len(); i++ {
-               key := response.Keys[i]
-               data := response.Data[i]
-               sid := response.SIDs[i]
-               tags := response.Tags[i]
-
-               // Find expected element by key
-               expected, found := expectedByKey[key]
-               if !found {
-                       t.Errorf("Unexpected key %d found in response at 
position %d", key, i)
-                       continue
-               }
-
-               // Verify series ID matches
-               if sid != expected.seriesID {
-                       t.Errorf("At position %d: expected seriesID %d, got 
%d", i, expected.seriesID, sid)
-               }
-
-               // Verify data matches
-               if !bytes.Equal(data, expected.data) {
-                       t.Errorf("At position %d: expected data %s, got %s", i, 
string(expected.data), string(data))
-               }
-
-               // Verify tags match
-               if len(tags) != len(expected.tags) {
-                       t.Errorf("At position %d: expected %d tags, got %d", i, 
len(expected.tags), len(tags))
-                       continue
-               }
-
-               for j, tag := range tags {
-                       expectedTag := expected.tags[j]
-                       if tag.Name != expectedTag.name {
-                               t.Errorf("At position %d, tag %d: expected name 
%s, got %s", i, j, expectedTag.name, tag.Name)
-                       }
-                       if !bytes.Equal(tag.Value, expectedTag.value) {
-                               t.Errorf("At position %d, tag %d: expected 
value %s, got %s", i, j, string(expectedTag.value), string(tag.Value))
-                       }
-                       if tag.ValueType != expectedTag.valueType {
-                               t.Errorf("At position %d, tag %d: expected 
valueType %v, got %v", i, j, expectedTag.valueType, tag.ValueType)
-                       }
-               }
-       }
-
-       // Verify keys are within expected range and properly sorted
-       for i, key := range response.Keys {
-               if key < 50 || key > 500 {
-                       t.Errorf("Key %d at position %d is outside expected 
range [50, 500]", key, i)
-               }
-               if i > 0 && key <= response.Keys[i-1] {
-                       t.Errorf("Keys not properly sorted: key %d at position 
%d should be greater than previous key %d", key, i, response.Keys[i-1])
-               }
-       }
-
-       // Clean up
-       qr.Release()
-}
-
-func TestQueryResult_Pull_WithTagProjection(t *testing.T) {
-       // Create test dataset with multiple tags
-       elements := createTestElements([]testElement{
-               {
-                       seriesID: 1, userKey: 100, data: []byte("data1"),
-                       tags: []tag{
-                               {name: "service", value: []byte("service1"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                               {name: "environment", value: []byte("prod"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                               {name: "region", value: []byte("us-west"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                       },
-               },
-               {
-                       seriesID: 2, userKey: 200, data: []byte("data2"),
-                       tags: []tag{
-                               {name: "service", value: []byte("service2"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                               {name: "environment", value: []byte("test"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                               {name: "region", value: []byte("us-east"), 
valueType: pbv1.ValueTypeStr, indexed: true},
-                       },
-               },
-       })
-       defer releaseElements(elements)
-
-       // Create memory part
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       // Create snapshot
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false}
-
-       // Create query request with tag projection (only load "service" and 
"environment")
-       req := QueryRequest{
-               SeriesIDs: []common.SeriesID{1, 2},
-               TagProjection: []model.TagProjection{
-                       {Names: []string{"service", "environment"}},
-               },
-               MinKey: nil,
-               MaxKey: nil,
-               Filter: nil,
-       }
-
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    300,
-               asc:       true,
-       }
-
-       qr := &queryResult{
-               request:  req,
-               ctx:      context.Background(),
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      true,
-               released: false,
-       }
-
-       // Test Pull with tag projection
-       response := qr.Pull()
-
-       if response == nil {
-               t.Fatal("Expected non-nil response")
-       }
-       if response.Error != nil {
-               t.Fatalf("Expected no error, got: %v", response.Error)
-       }
-
-       // Verify response has data
-       if response.Len() == 0 {
-               t.Error("Expected non-empty response")
-       }
-
-       // Verify tag projection worked (should only have projected tags)
-       // Note: The actual tag filtering logic needs to be verified based on 
implementation
-       for i, tagGroup := range response.Tags {
-               for _, tag := range tagGroup {
-                       if tag.Name != "service" && tag.Name != "environment" {
-                               t.Errorf("Unexpected tag '%s' found at position 
%d, should only have projected tags", tag.Name, i)
-                       }
-               }
-       }
-
-       qr.Release()
-}
-
-func TestQueryResult_Pull_DescendingOrder(t *testing.T) {
-       // Create test dataset
-       elements := createTestElements([]testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 1, userKey: 200, data: []byte("data2"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-               {seriesID: 1, userKey: 300, data: []byte("data3"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       })
-       defer releaseElements(elements)
-
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false}
-
-       req := QueryRequest{
-               SeriesIDs: []common.SeriesID{1},
-               MinKey:    nil,
-               MaxKey:    nil,
-               Filter:    nil,
-       }
-
-       // Create descending order scanner
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    400,
-               asc:       false, // Descending order
-       }
-
-       qr := &queryResult{
-               request:  req,
-               ctx:      context.Background(),
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      false, // Descending order
-               released: false,
-       }
-
-       response := qr.Pull()
-
-       if response == nil {
-               t.Fatal("Expected non-nil response")
-       }
-       if response.Error != nil {
-               t.Fatalf("Expected no error, got: %v", response.Error)
-       }
-
-       // Verify descending order
-       for i := 1; i < len(response.Keys); i++ {
-               if response.Keys[i] > response.Keys[i-1] {
-                       t.Errorf("Keys are not in descending order: %d > %d at 
positions %d, %d",
-                               response.Keys[i], response.Keys[i-1], i, i-1)
-               }
-       }
-
-       qr.Release()
-}
-
-func TestQueryResult_Pull_ErrorHandling(t *testing.T) {
-       // Test with quota exceeded scenario
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: true}
-
-       elements := createTestElements([]testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       })
-       defer releaseElements(elements)
-
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       req := QueryRequest{
-               SeriesIDs: []common.SeriesID{1},
-       }
-
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    200,
-               asc:       true,
-       }
-
-       qr := &queryResult{
-               request:  req,
-               ctx:      context.Background(),
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      true,
-               released: false,
-       }
-
-       response := qr.Pull()
-
-       // With quota exceeded, we expect either an error response or a 
response with an error
-       if response != nil && response.Error != nil {
-               t.Logf("Expected error due to quota exceeded: %v", 
response.Error)
-       }
-
-       qr.Release()
-}
-
-func TestQueryResult_Release(t *testing.T) {
-       elements := createTestElements([]testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       })
-       defer releaseElements(elements)
-
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       // Create a mock blockScanner to verify it gets closed
-       mockBS := &blockScanner{
-               pm:        &test.MockMemoryProtector{},
-               filter:    nil,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: []common.SeriesID{1},
-               minKey:    50,
-               maxKey:    200,
-               asc:       true,
-       }
-
-       // Create a mock protector
-       mockProtector := &test.MockMemoryProtector{}
-
-       qr := &queryResult{
-               request:    QueryRequest{SeriesIDs: []common.SeriesID{1}},
-               ctx:        context.Background(),
-               pm:         mockProtector,
-               snapshot:   snap,
-               bs:         mockBS,
-               l:          logger.GetLogger("test-queryResult"),
-               tagsToLoad: map[string]struct{}{"service": {}},
-               shards:     []*QueryResponse{},
-               asc:        true,
-               released:   false,
-       }
-
-       // Verify initial state
-       if qr.released {
-               t.Error("queryResult should not be released initially")
-       }
-       if qr.snapshot == nil {
-               t.Error("queryResult should have a snapshot initially")
-       }
-       if qr.bs == nil {
-               t.Error("queryResult should have a blockScanner initially")
-       }
-       if qr.tagsToLoad == nil {
-               t.Error("queryResult should have tagsToLoad initially")
-       }
-       if qr.shards == nil {
-               t.Error("queryResult should have shards initially")
-       }
-
-       // Store initial reference count for verification
-       initialRef := snap.ref
-
-       // Release
-       qr.Release()
-
-       // Verify released flag is set
-       if !qr.released {
-               t.Error("queryResult should be marked as released")
-       }
-
-       // Verify snapshot is nullified and reference count is decremented
-       if qr.snapshot != nil {
-               t.Error("queryResult.snapshot should be nullified after 
release")
-       }
-       if snap.ref != initialRef-1 {
-               t.Errorf("snapshot reference count should be decremented from 
%d to %d, got %d", initialRef, initialRef-1, snap.ref)
-       }
-
-       // Verify blockScanner is closed but not nullified (it gets closed in 
Release method but not set to nil)
-       if qr.bs == nil {
-               t.Error("queryResult.bs should not be nullified after release, 
only closed")
-       }
-       // Note: We can't easily verify that bs.close() was called without 
exposing internal state
-       // The important thing is that the blockScanner is still accessible for 
potential cleanup
-
-       // Verify other fields remain unchanged (they don't get nullified)
-       if qr.tagsToLoad == nil {
-               t.Error("queryResult.tagsToLoad should remain unchanged after 
release")
-       }
-       if qr.shards == nil {
-               t.Error("queryResult.shards should remain unchanged after 
release")
-       }
-       if qr.ctx == nil {
-               t.Error("queryResult.ctx should remain unchanged after release")
-       }
-       if qr.pm == nil {
-               t.Error("queryResult.pm should remain unchanged after release")
-       }
-       if qr.l == nil {
-               t.Error("queryResult.l should remain unchanged after release")
-       }
-
-       // Verify that subsequent calls to Release are safe (idempotent)
-       qr.Release()
-       if !qr.released {
-               t.Error("queryResult should remain released after second 
release call")
-       }
-       if qr.snapshot != nil {
-               t.Error("queryResult.snapshot should remain nullified after 
second release call")
-       }
-       if qr.bs == nil {
-               t.Error("queryResult.bs should remain accessible after second 
release call")
-       }
-}
-
-func TestQueryResult_Pull_ContextCancellation(t *testing.T) {
-       elements := createTestElements([]testElement{
-               {seriesID: 1, userKey: 100, data: []byte("data1"), tags: 
[]tag{{name: "service", value: []byte("service1"), valueType: 
pbv1.ValueTypeStr, indexed: true}}},
-       })
-       defer releaseElements(elements)
-
-       mp := generateMemPart()
-       defer releaseMemPart(mp)
-       mp.mustInitFromElements(elements)
-
-       testPart := openMemPart(mp)
-       defer testPart.close()
-
-       parts := []*part{testPart}
-       snap := &snapshot{}
-       snap.parts = make([]*partWrapper, len(parts))
-       for i, p := range parts {
-               snap.parts[i] = newPartWrapper(nil, p)
-       }
-       // Initialize reference count to 1
-       snap.ref = 1
-
-       mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false}
-
-       // Create canceled context
-       ctx, cancel := context.WithCancel(context.Background())
-       cancel() // Cancel immediately
-
-       req := QueryRequest{
-               SeriesIDs: []common.SeriesID{1},
-       }
-
-       bs := &blockScanner{
-               pm:        mockProtector,
-               filter:    req.Filter,
-               l:         logger.GetLogger("test"),
-               parts:     parts,
-               seriesIDs: req.SeriesIDs,
-               minKey:    50,
-               maxKey:    200,
-               asc:       true,
-       }
-
-       qr := &queryResult{
-               request:  req,
-               ctx:      ctx,
-               pm:       mockProtector,
-               snapshot: snap,
-               bs:       bs,
-               l:        logger.GetLogger("test-queryResult"),
-               asc:      true,
-               released: false,
-       }
-
-       response := qr.Pull()
-
-       // With canceled context, we should handle gracefully
-       // The behavior depends on implementation - it might return nil or an 
error response
-       if response != nil {
-               t.Logf("Response with canceled context: error=%v, len=%d", 
response.Error, response.Len())
-       } else {
-               t.Log("Got nil response with canceled context - this is 
acceptable")
-       }
-
-       qr.Release()
-}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index fa9150a4..cb6f9722 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -24,9 +24,12 @@ import (
        "sync"
        "sync/atomic"
 
+       "go.uber.org/multierr"
+
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -212,20 +215,17 @@ func (s *sidx) Query(ctx context.Context, req 
QueryRequest) (*QueryResponse, err
                asc:       asc,
        }
 
-       // Create queryResult and call Pull() once to get the QueryResponse
-       qr := &queryResult{
-               snapshot: snap,
-               request:  req,
-               ctx:      ctx,
-               bs:       bs,
-               asc:      asc,
-               pm:       s.pm,
-               l:        s.l,
-       }
-       defer qr.Release()
-
-       // Pull once to get all results
-       response := qr.Pull()
+       // Execute query with worker pool pattern directly
+       defer func() {
+               if bs != nil {
+                       bs.close()
+               }
+               if snap != nil {
+                       snap.decRef()
+               }
+       }()
+
+       response := s.executeBlockScannerQuery(ctx, bs, req, asc)
        if response == nil {
                return &QueryResponse{
                        Keys: make([]int64, 0),
@@ -243,6 +243,119 @@ func (s *sidx) Query(ctx context.Context, req 
QueryRequest) (*QueryResponse, err
        return response, nil
 }
 
+// executeBlockScannerQuery coordinates the worker pool with block scanner 
following tsResult pattern.
+func (s *sidx) executeBlockScannerQuery(ctx context.Context, bs *blockScanner, 
req QueryRequest, asc bool) *QueryResponse {
+       workerSize := cgroups.CPUs()
+       batchCh := make(chan *blockScanResultBatch, workerSize)
+
+       // Determine which tags to load once for all workers (shared 
optimization)
+       tagsToLoad := make(map[string]struct{})
+       if len(req.TagProjection) > 0 {
+               // Load only projected tags
+               for _, proj := range req.TagProjection {
+                       for _, tagName := range proj.Names {
+                               tagsToLoad[tagName] = struct{}{}
+                       }
+               }
+       }
+
+       // Initialize worker result shards
+       shards := make([]*QueryResponse, workerSize)
+       for i := range shards {
+               shards[i] = &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }
+       }
+
+       // Launch worker pool
+       var workerWg sync.WaitGroup
+       workerWg.Add(workerSize)
+
+       for i := range workerSize {
+               go func(workerID int) {
+                       defer workerWg.Done()
+                       s.processWorkerBatches(workerID, batchCh, 
shards[workerID], tagsToLoad, req, s.pm)
+               }(i)
+       }
+
+       // Start block scanning
+       go func() {
+               bs.scan(ctx, batchCh)
+               close(batchCh)
+       }()
+
+       workerWg.Wait()
+
+       // Merge results from all workers
+       return s.mergeWorkerResults(shards, asc, req.MaxElementSize)
+}
+
+// processWorkerBatches processes batches in a worker goroutine.
+func (s *sidx) processWorkerBatches(
+       _ int, batchCh chan *blockScanResultBatch, shard *QueryResponse,
+       tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory,
+) {
+       tmpBlock := generateBlock()
+       defer releaseBlock(tmpBlock)
+
+       for batch := range batchCh {
+               if batch.err != nil {
+                       shard.Error = batch.err
+                       releaseBlockScanResultBatch(batch)
+                       continue
+               }
+
+               for _, bs := range batch.bss {
+                       if !s.loadAndProcessBlock(tmpBlock, bs, shard, 
tagsToLoad, req, pm) {
+                               // If load fails, continue with next block 
rather than stopping
+                               continue
+                       }
+               }
+
+               releaseBlockScanResultBatch(batch)
+       }
+}
+
+// mergeWorkerResults merges results from all worker shards with error 
handling.
+func (s *sidx) mergeWorkerResults(shards []*QueryResponse, asc bool, 
maxElementSize int) *QueryResponse {
+       // Check for errors first
+       var err error
+       for i := range shards {
+               if shards[i].Error != nil {
+                       err = multierr.Append(err, shards[i].Error)
+               }
+       }
+
+       if err != nil {
+               return &QueryResponse{Error: err}
+       }
+
+       // Merge results with ordering
+       if asc {
+               return mergeQueryResponseShardsAsc(shards, maxElementSize)
+       }
+       return mergeQueryResponseShardsDesc(shards, maxElementSize)
+}
+
+// loadAndProcessBlock delegates to the queryResult implementation for now.
+func (s *sidx) loadAndProcessBlock(
+       tmpBlock *block, bs blockScanResult, result *QueryResponse,
+       tagsToLoad map[string]struct{}, req QueryRequest, pm protector.Memory,
+) bool {
+       // Create a temporary queryResult to reuse existing logic
+       qr := &queryResult{
+               request:    req,
+               tagsToLoad: tagsToLoad,
+               pm:         pm,
+               l:          s.l,
+       }
+
+       return qr.loadAndProcessBlock(tmpBlock, bs, result)
+}
+
 // Stats implements SIDX interface.
 func (s *sidx) Stats(_ context.Context) (*Stats, error) {
        snap := s.currentSnapshot()
@@ -413,14 +526,3 @@ func newGC(_ *Options) *gc {
 func (g *gc) clean() {
        // TODO: Implement garbage collection
 }
-
-// emptyQueryResult represents an empty query result.
-type emptyQueryResult struct{}
-
-func (e *emptyQueryResult) Pull() *QueryResponse {
-       return nil
-}
-
-func (e *emptyQueryResult) Release() {
-       // Nothing to release
-}

Reply via email to