This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6880003a1f33c40a4bb9a2f9828af7ffeaf4f31b
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Aug 25 11:41:32 2025 +0800

    Refactor QueryRequest structure: Replace Entities with SeriesIDs for 
improved query handling. Introduce MinKey and MaxKey for range queries, 
enhancing validation logic. Update related components and tests to accommodate 
new structure and ensure robust functionality.
---
 banyand/internal/sidx/interfaces.go          |  60 ++++--
 banyand/internal/sidx/interfaces_examples.go |  13 +-
 banyand/internal/sidx/mock_components.go     |  14 +-
 banyand/internal/sidx/part_wrapper.go        |  34 +++
 banyand/internal/sidx/part_wrapper_test.go   |  75 +++++++
 banyand/internal/sidx/query_result.go        | 300 +++++++++++++++++++++++++++
 banyand/internal/sidx/sidx.go                | 121 +++++++----
 banyand/stream/query_by_idx.go               |   3 +-
 banyand/stream/query_by_ts.go                |   6 +-
 9 files changed, 545 insertions(+), 81 deletions(-)

diff --git a/banyand/internal/sidx/interfaces.go 
b/banyand/internal/sidx/interfaces.go
index 6fa9df21..9950bbac 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -25,7 +25,6 @@ import (
        "sync/atomic"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
@@ -127,10 +126,10 @@ type QueryRequest struct {
        // Name identifies the series/index to query
        Name string
 
-       // Entities specifies entity filtering (same as StreamQueryOptions)
-       Entities [][]*modelv1.TagValue
+       // SeriesIDs specifies the series to query (provided externally)
+       SeriesIDs []common.SeriesID
 
-       // Filter for key range and tag-based filtering using index.Filter
+       // Filter for tag-based filtering using index.Filter
        // Note: sidx uses bloom filters for tag filtering, not inverted indexes
        Filter index.Filter
 
@@ -142,6 +141,12 @@ type QueryRequest struct {
 
        // MaxElementSize limits result size
        MaxElementSize int
+
+       // MinKey specifies the minimum key for range queries (nil = no limit)
+       MinKey *int64
+
+       // MaxKey specifies the maximum key for range queries (nil = no limit)
+       MaxKey *int64
 }
 
 // QueryResponse contains a batch of query results and execution metadata.
@@ -353,49 +358,47 @@ func (qr QueryRequest) Validate() error {
        if qr.Name == "" {
                return fmt.Errorf("name cannot be empty")
        }
+       if len(qr.SeriesIDs) == 0 {
+               return fmt.Errorf("at least one SeriesID is required")
+       }
        if qr.MaxElementSize < 0 {
                return fmt.Errorf("maxElementSize cannot be negative")
        }
+       // Validate key range
+       if qr.MinKey != nil && qr.MaxKey != nil && *qr.MinKey > *qr.MaxKey {
+               return fmt.Errorf("MinKey cannot be greater than MaxKey")
+       }
        // Validate tag projection names
        for i, projection := range qr.TagProjection {
                if projection.Family == "" {
                        return fmt.Errorf("tagProjection[%d] family cannot be 
empty", i)
                }
        }
-       // Validate entities structure
-       for i, entityGroup := range qr.Entities {
-               if len(entityGroup) == 0 {
-                       return fmt.Errorf("entities[%d] cannot be empty", i)
-               }
-               for j, tagValue := range entityGroup {
-                       if tagValue == nil {
-                               return fmt.Errorf("entities[%d][%d] cannot be 
nil", i, j)
-                       }
-               }
-       }
        return nil
 }
 
 // Reset resets the QueryRequest to its zero state.
 func (qr *QueryRequest) Reset() {
        qr.Name = ""
-       qr.Entities = nil
+       qr.SeriesIDs = nil
        qr.Filter = nil
        qr.Order = nil
        qr.TagProjection = nil
        qr.MaxElementSize = 0
+       qr.MinKey = nil
+       qr.MaxKey = nil
 }
 
 // CopyFrom copies the QueryRequest from other to qr.
 func (qr *QueryRequest) CopyFrom(other *QueryRequest) {
        qr.Name = other.Name
 
-       // Deep copy for Entities if it's a slice
-       if other.Entities != nil {
-               qr.Entities = make([][]*modelv1.TagValue, len(other.Entities))
-               copy(qr.Entities, other.Entities)
+       // Deep copy for SeriesIDs if it's a slice
+       if other.SeriesIDs != nil {
+               qr.SeriesIDs = make([]common.SeriesID, len(other.SeriesIDs))
+               copy(qr.SeriesIDs, other.SeriesIDs)
        } else {
-               qr.Entities = nil
+               qr.SeriesIDs = nil
        }
 
        qr.Filter = other.Filter
@@ -410,6 +413,21 @@ func (qr *QueryRequest) CopyFrom(other *QueryRequest) {
        }
 
        qr.MaxElementSize = other.MaxElementSize
+
+       // Copy key range pointers
+       if other.MinKey != nil {
+               minKey := *other.MinKey
+               qr.MinKey = &minKey
+       } else {
+               qr.MinKey = nil
+       }
+
+       if other.MaxKey != nil {
+               maxKey := *other.MaxKey
+               qr.MaxKey = &maxKey
+       } else {
+               qr.MaxKey = nil
+       }
 }
 
 // Interface Usage Examples and Best Practices
diff --git a/banyand/internal/sidx/interfaces_examples.go 
b/banyand/internal/sidx/interfaces_examples.go
index 47b802a3..bfb03ff3 100644
--- a/banyand/internal/sidx/interfaces_examples.go
+++ b/banyand/internal/sidx/interfaces_examples.go
@@ -27,7 +27,6 @@ import (
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -107,14 +106,10 @@ func (e *InterfaceUsageExamples) BasicWriteExample(ctx 
context.Context) error {
 func (e *InterfaceUsageExamples) AdvancedQueryExample(ctx context.Context) 
error {
        // Create query request with range and tag filtering
        queryReq := QueryRequest{
-               Name: "trace-sidx",
-               Entities: [][]*modelv1.TagValue{
-                       {
-                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "user-service"}}},
-                       },
-               },
-               Filter: nil, // In production, use actual index.Filter 
implementation
-               Order:  nil, // In production, use actual index.OrderBy 
implementation
+               Name:      "trace-sidx",
+               SeriesIDs: []common.SeriesID{1, 2, 3}, // Example series IDs
+               Filter:    nil,                        // In production, use 
actual index.Filter implementation
+               Order:     nil,                        // In production, use 
actual index.OrderBy implementation
                TagProjection: []model.TagProjection{
                        {Family: "service", Names: []string{"name"}},
                        {Family: "endpoint", Names: []string{"path"}},
diff --git a/banyand/internal/sidx/mock_components.go 
b/banyand/internal/sidx/mock_components.go
index 554b78d9..2e9894e4 100644
--- a/banyand/internal/sidx/mock_components.go
+++ b/banyand/internal/sidx/mock_components.go
@@ -272,11 +272,15 @@ func (mq *MockQuerier) matchesQuery(elem WriteRequest, 
req QueryRequest) bool {
        // Basic range filtering - in reality this would be more sophisticated
        // For the mock, we'll match all elements unless specific filters are 
applied
 
-       // Apply entity filtering if specified
-       if len(req.Entities) > 0 {
-               // Simplified entity matching for the mock
-               // In practice, this would properly evaluate entity constraints
-               _ = elem // Use elem to avoid unused parameter warning
+       // Apply SeriesID filtering if specified
+       if len(req.SeriesIDs) > 0 {
+               // Check if the element's SeriesID is in the requested list
+               for _, sid := range req.SeriesIDs {
+                       if elem.SeriesID == sid {
+                               return true
+                       }
+               }
+               return false
        }
 
        // For the mock, we'll accept all elements that pass basic checks
diff --git a/banyand/internal/sidx/part_wrapper.go 
b/banyand/internal/sidx/part_wrapper.go
index c5c41137..c3699148 100644
--- a/banyand/internal/sidx/part_wrapper.go
+++ b/banyand/internal/sidx/part_wrapper.go
@@ -222,3 +222,37 @@ func (pw *partWrapper) String() string {
        return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}",
                pw.ID(), state, refCount, pw.p.path)
 }
+
+// overlapsKeyRange checks if the part overlaps with the given key range.
+// Returns true if there is any overlap between the part's key range and the 
query range.
+// Uses part metadata to perform efficient range filtering without I/O.
+func (pw *partWrapper) overlapsKeyRange(minKey, maxKey int64) bool {
+       if pw.p == nil {
+               return false
+       }
+
+       // Validate input range
+       if minKey > maxKey {
+               return false
+       }
+
+       // Check if part metadata is available
+       if pw.p.partMetadata == nil {
+               // If no metadata available, assume overlap to be safe
+               // This ensures we don't skip parts that might contain relevant 
data
+               return true
+       }
+
+       pm := pw.p.partMetadata
+
+       // Check for non-overlapping ranges using De Morgan's law:
+       // Two ranges [a,b] and [c,d] don't overlap if: b < c OR a > d
+       // Therefore, they DO overlap if: NOT(b < c OR a > d) = (b >= c AND a 
<= d)
+       // Simplified: part.MaxKey >= query.MinKey AND part.MinKey <= 
query.MaxKey
+       if pm.MaxKey < minKey || pm.MinKey > maxKey {
+               return false
+       }
+
+       // Ranges overlap
+       return true
+}
diff --git a/banyand/internal/sidx/part_wrapper_test.go 
b/banyand/internal/sidx/part_wrapper_test.go
index c05b5e87..42bfebd1 100644
--- a/banyand/internal/sidx/part_wrapper_test.go
+++ b/banyand/internal/sidx/part_wrapper_test.go
@@ -514,3 +514,78 @@ func TestPartWrapper_CleanupIdempotency(t *testing.T) {
        assert.True(t, pw.isRemoved())
        assert.Equal(t, int32(0), pw.refCount())
 }
+
+func TestPartWrapper_OverlapsKeyRange(t *testing.T) {
+       tests := []struct {
+               name     string
+               partMin  int64
+               partMax  int64
+               queryMin int64
+               queryMax int64
+               expected bool
+       }{
+               {"complete_overlap", 10, 20, 5, 25, true},
+               {"part_inside_query", 10, 20, 5, 25, true},
+               {"query_inside_part", 5, 25, 10, 20, true},
+               {"partial_overlap_left", 10, 20, 15, 25, true},
+               {"partial_overlap_right", 10, 20, 5, 15, true},
+               {"adjacent_left_no_overlap", 10, 20, 1, 9, false},
+               {"adjacent_right_no_overlap", 10, 20, 21, 30, false},
+               {"touching_left_boundary", 10, 20, 5, 10, true},
+               {"touching_right_boundary", 10, 20, 20, 25, true},
+               {"completely_separate_left", 10, 20, 1, 5, false},
+               {"completely_separate_right", 10, 20, 25, 30, false},
+               {"exact_match", 10, 20, 10, 20, true},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create part with metadata
+                       p := &part{
+                               path: "/test/part/001",
+                               partMetadata: &partMetadata{
+                                       ID:     1,
+                                       MinKey: tt.partMin,
+                                       MaxKey: tt.partMax,
+                               },
+                       }
+                       pw := newPartWrapper(p)
+
+                       result := pw.overlapsKeyRange(tt.queryMin, tt.queryMax)
+                       assert.Equal(t, tt.expected, result,
+                               "part[%d,%d] query[%d,%d] should be %v",
+                               tt.partMin, tt.partMax, tt.queryMin, 
tt.queryMax, tt.expected)
+               })
+       }
+}
+
+func TestPartWrapper_OverlapsKeyRange_EdgeCases(t *testing.T) {
+       t.Run("nil_part", func(t *testing.T) {
+               pw := newPartWrapper(nil)
+               assert.False(t, pw.overlapsKeyRange(10, 20))
+       })
+
+       t.Run("nil_metadata", func(t *testing.T) {
+               p := &part{
+                       path:         "/test/part/001",
+                       partMetadata: nil,
+               }
+               pw := newPartWrapper(p)
+               // Should return true (safe default) when metadata is 
unavailable
+               assert.True(t, pw.overlapsKeyRange(10, 20))
+       })
+
+       t.Run("invalid_query_range", func(t *testing.T) {
+               p := &part{
+                       path: "/test/part/001",
+                       partMetadata: &partMetadata{
+                               ID:     1,
+                               MinKey: 10,
+                               MaxKey: 20,
+                       },
+               }
+               pw := newPartWrapper(p)
+               // Query range where min > max should return false
+               assert.False(t, pw.overlapsKeyRange(25, 15))
+       })
+}
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
new file mode 100644
index 00000000..6c3ea5a9
--- /dev/null
+++ b/banyand/internal/sidx/query_result.go
@@ -0,0 +1,300 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "context"
+       "sync"
+
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// queryResult implements QueryResult interface with worker pool pattern.
+// Following the tsResult architecture from the stream module.
+type queryResult struct {
+       // Core query components
+       snapshot *snapshot    // Reference to current snapshot
+       request  QueryRequest // Original query request (contains all 
parameters)
+       ctx      context.Context
+
+       // Block scanning components
+       bs    *blockScanner // Block scanner for iteration
+       parts []*part       // Selected parts for query
+
+       // Worker coordination
+       shards []*QueryResponse // Result shards from parallel workers
+       pm     protector.Memory // Memory quota management
+       l      *logger.Logger   // Logger instance
+
+       // Query parameters (derived from request)
+       asc bool // Ordering direction
+
+       // State management
+       released bool
+}
+
+// Pull returns the next batch of query results using parallel worker 
processing.
+func (qr *queryResult) Pull() *QueryResponse {
+       if qr.released || qr.bs == nil {
+               return nil
+       }
+
+       return qr.runBlockScanner()
+}
+
+// runBlockScanner coordinates the worker pool with block scanner following 
tsResult pattern.
+func (qr *queryResult) runBlockScanner() *QueryResponse {
+       workerSize := cgroups.CPUs()
+       batchCh := make(chan *blockScanResultBatch, workerSize)
+
+       // Initialize worker result shards
+       if qr.shards == nil {
+               qr.shards = make([]*QueryResponse, workerSize)
+               for i := range qr.shards {
+                       qr.shards[i] = &QueryResponse{
+                               Keys: make([]int64, 0),
+                               Data: make([][]byte, 0),
+                               Tags: make([][]Tag, 0),
+                               SIDs: make([]common.SeriesID, 0),
+                       }
+               }
+       } else {
+               // Reset existing shards
+               for i := range qr.shards {
+                       qr.shards[i].Reset()
+               }
+       }
+
+       // Launch worker pool
+       var workerWg sync.WaitGroup
+       workerWg.Add(workerSize)
+
+       for i := range workerSize {
+               go func(workerID int) {
+                       defer workerWg.Done()
+                       qr.processWorkerBatches(workerID, batchCh)
+               }(i)
+       }
+
+       // Start block scanning
+       go func() {
+               qr.bs.scan(qr.ctx, batchCh)
+               close(batchCh)
+       }()
+
+       workerWg.Wait()
+
+       // Check for completion
+       if len(qr.bs.parts) == 0 {
+               qr.bs.close()
+               qr.bs = nil
+       }
+
+       // Merge results from all workers
+       return qr.mergeWorkerResults()
+}
+
+// processWorkerBatches processes batches in a worker goroutine.
+func (qr *queryResult) processWorkerBatches(workerID int, batchCh chan 
*blockScanResultBatch) {
+       tmpBlock := generateBlock()
+       defer releaseBlock(tmpBlock)
+
+       for batch := range batchCh {
+               if batch.err != nil {
+                       qr.shards[workerID].Error = batch.err
+                       releaseBlockScanResultBatch(batch)
+                       continue
+               }
+
+               for _, bs := range batch.bss {
+                       qr.loadAndProcessBlock(tmpBlock, bs, 
qr.shards[workerID])
+               }
+
+               releaseBlockScanResultBatch(batch)
+       }
+}
+
+// loadAndProcessBlock loads a block from part and processes it into 
QueryResponse format.
+func (qr *queryResult) loadAndProcessBlock(tmpBlock *block, bs 
blockScanResult, result *QueryResponse) bool {
+       tmpBlock.reset()
+
+       // Load block data from part (similar to stream's loadBlockCursor)
+       if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) {
+               return false
+       }
+
+       // Convert block data to QueryResponse format
+       qr.convertBlockToResponse(tmpBlock, bs.bm.seriesID, result)
+
+       return true
+}
+
+// loadBlockData loads block data from part using block metadata.
+func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm 
*blockMetadata) bool {
+       // TODO: Implement actual block loading from part
+       // This should use the existing block reader functionality
+       _ = tmpBlock // TODO: use when implementing block loading
+       _ = p        // TODO: use when implementing block loading
+       _ = bm       // TODO: use when implementing block loading
+       // For now, return false to avoid processing
+       return false
+}
+
+// convertBlockToResponse converts SIDX block data to QueryResponse format.
+func (qr *queryResult) convertBlockToResponse(block *block, seriesID 
common.SeriesID, result *QueryResponse) {
+       elemCount := len(block.userKeys)
+
+       for i := 0; i < elemCount; i++ {
+               // Apply MaxElementSize limit from request
+               if result.Len() >= qr.request.MaxElementSize {
+                       break
+               }
+
+               // Copy parallel arrays
+               result.Keys = append(result.Keys, block.userKeys[i])
+               result.Data = append(result.Data, block.data[i])
+               result.SIDs = append(result.SIDs, seriesID)
+
+               // Convert tag map to tag slice for this element
+               elementTags := qr.extractElementTags(block, i)
+               result.Tags = append(result.Tags, elementTags)
+       }
+}
+
+// extractElementTags extracts tags for a specific element with projection 
support.
+func (qr *queryResult) extractElementTags(block *block, elemIndex int) []Tag {
+       var elementTags []Tag
+
+       // Apply tag projection from request
+       if len(qr.request.TagProjection) > 0 {
+               elementTags = make([]Tag, 0, len(qr.request.TagProjection))
+               for _, proj := range qr.request.TagProjection {
+                       for _, tagName := range proj.Names {
+                               if tagData, exists := block.tags[tagName]; 
exists && elemIndex < len(tagData.values) {
+                                       elementTags = append(elementTags, Tag{
+                                               name:      tagName,
+                                               value:     
tagData.values[elemIndex],
+                                               valueType: tagData.valueType,
+                                       })
+                               }
+                       }
+               }
+       } else {
+               // Include all tags if no projection specified
+               elementTags = make([]Tag, 0, len(block.tags))
+               for tagName, tagData := range block.tags {
+                       if elemIndex < len(tagData.values) {
+                               elementTags = append(elementTags, Tag{
+                                       name:      tagName,
+                                       value:     tagData.values[elemIndex],
+                                       valueType: tagData.valueType,
+                               })
+                       }
+               }
+       }
+
+       return elementTags
+}
+
+// mergeWorkerResults merges results from all worker shards with error 
handling.
+func (qr *queryResult) mergeWorkerResults() *QueryResponse {
+       // Check for errors first
+       var err error
+       for i := range qr.shards {
+               if qr.shards[i].Error != nil {
+                       err = multierr.Append(err, qr.shards[i].Error)
+               }
+       }
+
+       if err != nil {
+               return &QueryResponse{Error: err}
+       }
+
+       // Merge results with ordering from request
+       if qr.asc {
+               return mergeQueryResponseShardsAsc(qr.shards, 
qr.request.MaxElementSize)
+       } else {
+               return mergeQueryResponseShardsDesc(qr.shards, 
qr.request.MaxElementSize)
+       }
+}
+
+// Release releases resources associated with the query result.
+func (qr *queryResult) Release() {
+       if qr.released {
+               return
+       }
+       qr.released = true
+
+       if qr.bs != nil {
+               qr.bs.close()
+       }
+
+       if qr.snapshot != nil {
+               qr.snapshot.decRef()
+               qr.snapshot = nil
+       }
+}
+
+// mergeQueryResponseShardsAsc merges multiple QueryResponse shards in 
ascending order.
+func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
+       result := &QueryResponse{
+               Keys: make([]int64, 0, maxElements),
+               Data: make([][]byte, 0, maxElements),
+               Tags: make([][]Tag, 0, maxElements),
+               SIDs: make([]common.SeriesID, 0, maxElements),
+       }
+
+       // Simple concatenation for now - TODO: implement proper merge sort
+       for _, shard := range shards {
+               for i := 0; i < shard.Len() && result.Len() < maxElements; i++ {
+                       result.Keys = append(result.Keys, shard.Keys[i])
+                       result.Data = append(result.Data, shard.Data[i])
+                       result.Tags = append(result.Tags, shard.Tags[i])
+                       result.SIDs = append(result.SIDs, shard.SIDs[i])
+               }
+       }
+
+       return result
+}
+
+// mergeQueryResponseShardsDesc merges multiple QueryResponse shards in 
descending order.
+func mergeQueryResponseShardsDesc(shards []*QueryResponse, maxElements int) 
*QueryResponse {
+       result := &QueryResponse{
+               Keys: make([]int64, 0, maxElements),
+               Data: make([][]byte, 0, maxElements),
+               Tags: make([][]Tag, 0, maxElements),
+               SIDs: make([]common.SeriesID, 0, maxElements),
+       }
+
+       // Simple concatenation for now - TODO: implement proper merge sort
+       for _, shard := range shards {
+               for i := 0; i < shard.Len() && result.Len() < maxElements; i++ {
+                       result.Keys = append(result.Keys, shard.Keys[i])
+                       result.Data = append(result.Data, shard.Data[i])
+                       result.Tags = append(result.Tags, shard.Tags[i])
+                       result.SIDs = append(result.SIDs, shard.SIDs[i])
+               }
+       }
+
+       return result
+}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 4c7fdc4f..262a72f1 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -19,10 +19,14 @@ package sidx
 
 import (
        "context"
+       "math"
+       "sort"
        "sync"
        "sync/atomic"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -37,6 +41,7 @@ type sidx struct {
        loopCloser                 *run.Closer
        gc                         *gc
        l                          *logger.Logger
+       pm                         protector.Memory
        totalIntroduceLoopStarted  atomic.Int64
        totalIntroduceLoopFinished atomic.Int64
        mu                         sync.RWMutex
@@ -58,6 +63,7 @@ func NewSIDX(opts *Options) (SIDX, error) {
                mergeCh:       make(chan *mergerIntroduction),
                loopCloser:    run.NewCloser(1),
                l:             logger.GetLogger().Named("sidx"),
+               pm:            opts.Memory,
        }
 
        // Initialize garbage collector
@@ -110,6 +116,42 @@ func (s *sidx) Write(ctx context.Context, reqs 
[]WriteRequest) error {
        }
 }
 
+// extractKeyRange extracts min/max key range from QueryRequest.
+func extractKeyRange(req QueryRequest) (int64, int64) {
+       minKey := int64(math.MinInt64)
+       maxKey := int64(math.MaxInt64)
+
+       if req.MinKey != nil {
+               minKey = *req.MinKey
+       }
+       if req.MaxKey != nil {
+               maxKey = *req.MaxKey
+       }
+
+       return minKey, maxKey
+}
+
+// extractOrdering extracts ordering direction from QueryRequest.
+func extractOrdering(req QueryRequest) bool {
+       if req.Order == nil {
+               return true // Default ascending
+       }
+       return req.Order.Sort != modelv1.Sort_SORT_DESC
+}
+
+// selectPartsForQuery selects relevant parts from snapshot based on key range.
+func selectPartsForQuery(snap *snapshot, minKey, maxKey int64) []*part {
+       var selectedParts []*part
+
+       for _, pw := range snap.parts {
+               if pw.isActive() && pw.overlapsKeyRange(minKey, maxKey) {
+                       selectedParts = append(selectedParts, pw.p)
+               }
+       }
+
+       return selectedParts
+}
+
 // Query implements SIDX interface.
 func (s *sidx) Query(ctx context.Context, req QueryRequest) (QueryResult, 
error) {
        if err := req.Validate(); err != nil {
@@ -122,14 +164,46 @@ func (s *sidx) Query(ctx context.Context, req 
QueryRequest) (QueryResult, error)
                return &emptyQueryResult{}, nil
        }
 
-       // Create query result
-       qr := &queryResult{
+       // Extract parameters directly from request
+       minKey, maxKey := extractKeyRange(req)
+       asc := extractOrdering(req)
+
+       // Select relevant parts
+       parts := selectPartsForQuery(snap, minKey, maxKey)
+       if len(parts) == 0 {
+               snap.decRef()
+               return &emptyQueryResult{}, nil
+       }
+
+       // Sort SeriesIDs for efficient processing
+       seriesIDs := make([]common.SeriesID, len(req.SeriesIDs))
+       copy(seriesIDs, req.SeriesIDs)
+       sort.Slice(seriesIDs, func(i, j int) bool {
+               return seriesIDs[i] < seriesIDs[j]
+       })
+
+       // Initialize block scanner using request parameters directly
+       bs := &blockScanner{
+               pm:        s.pm,
+               filter:    req.Filter,
+               l:         s.l,
+               parts:     parts,
+               seriesIDs: seriesIDs,
+               minKey:    minKey,
+               maxKey:    maxKey,
+               asc:       asc,
+       }
+
+       return &queryResult{
                snapshot: snap,
                request:  req,
                ctx:      ctx,
-       }
-
-       return qr, nil
+               bs:       bs,
+               parts:    parts,
+               asc:      asc,
+               pm:       s.pm,
+               l:        s.l,
+       }, nil
 }
 
 // Stats implements SIDX interface.
@@ -313,40 +387,3 @@ func (e *emptyQueryResult) Pull() *QueryResponse {
 func (e *emptyQueryResult) Release() {
        // Nothing to release
 }
-
-// queryResult implements QueryResult interface.
-type queryResult struct {
-       ctx      context.Context
-       snapshot *snapshot
-       request  QueryRequest
-       released bool
-}
-
-func (qr *queryResult) Pull() *QueryResponse {
-       if qr.released {
-               return nil
-       }
-
-       // Simple implementation - return empty response
-       // TODO: Implement actual query logic
-       return &QueryResponse{
-               Keys: []int64{},
-               Data: [][]byte{},
-               Tags: [][]Tag{},
-               SIDs: []common.SeriesID{},
-               Metadata: ResponseMetadata{
-                       ExecutionTimeMs: 0,
-               },
-       }
-}
-
-func (qr *queryResult) Release() {
-       if qr.released {
-               return
-       }
-       qr.released = true
-       if qr.snapshot != nil {
-               qr.snapshot.decRef()
-               qr.snapshot = nil
-       }
-}
diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go
index 3d5ee61e..0baadbad 100644
--- a/banyand/stream/query_by_idx.go
+++ b/banyand/stream/query_by_idx.go
@@ -131,6 +131,7 @@ func (qr *idxResult) load(ctx context.Context, qo 
queryOptions) *model.StreamRes
        }
 
        cursorChan := make(chan int, len(qr.data))
+       is := qr.sm.indexSchema.Load().(indexSchema)
        for i := 0; i < len(qr.data); i++ {
                go func(i int) {
                        select {
@@ -146,7 +147,7 @@ func (qr *idxResult) load(ctx context.Context, qo 
queryOptions) *model.StreamRes
                        }
                        tmpBlock := generateBlock()
                        defer releaseBlock(tmpBlock)
-                       if loadBlockCursor(qr.data[i], tmpBlock, qo, qr.sm) {
+                       if loadBlockCursor(qr.data[i], tmpBlock, qo, is) {
                                cursorChan <- -1
                                return
                        }
diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go
index 1607044a..f91dc293 100644
--- a/banyand/stream/query_by_ts.go
+++ b/banyand/stream/query_by_ts.go
@@ -105,6 +105,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) 
(*model.StreamResult, erro
                        t.shards[i].Reset()
                }
        }
+       is := t.sm.indexSchema.Load().(indexSchema)
        for i := range workerSize {
                go func(workerID int) {
                        tmpBlock := generateBlock()
@@ -121,7 +122,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) 
(*model.StreamResult, erro
                                for _, bs := range batch.bss {
                                        bc := generateBlockCursor()
                                        bc.init(bs.p, &bs.bm, bs.qo)
-                                       if loadBlockCursor(bc, tmpBlock, bs.qo, 
t.sm) {
+                                       if loadBlockCursor(bc, tmpBlock, bs.qo, 
is) {
                                                if !t.asc {
                                                        bc.idx = 
len(bc.timestamps) - 1
                                                }
@@ -156,7 +157,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) 
(*model.StreamResult, erro
        return model.MergeStreamResults(t.shards, t.qo.MaxElementSize, t.asc), 
nil
 }
 
-func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm 
*stream) bool {
+func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, is 
indexSchema) bool {
        tmpBlock.reset()
        if !bc.loadData(tmpBlock) {
                releaseBlockCursor(bc)
@@ -168,7 +169,6 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo 
queryOptions, sm *stre
        for idx, tagFamily := range bc.tagFamilies {
                tagFamilyMap[tagFamily.name] = idx + 1
        }
-       is := sm.indexSchema.Load().(indexSchema)
        for _, tagFamilyProj := range bc.tagProjection {
                for j, tagProj := range tagFamilyProj.Names {
                        tagSpec := is.tagMap[tagProj]

Reply via email to