This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6880003a1f33c40a4bb9a2f9828af7ffeaf4f31b Author: Gao Hongtao <[email protected]> AuthorDate: Mon Aug 25 11:41:32 2025 +0800 Refactor QueryRequest structure: Replace Entities with SeriesIDs for improved query handling. Introduce MinKey and MaxKey for range queries, enhancing validation logic. Update related components and tests to accommodate new structure and ensure robust functionality. --- banyand/internal/sidx/interfaces.go | 60 ++++-- banyand/internal/sidx/interfaces_examples.go | 13 +- banyand/internal/sidx/mock_components.go | 14 +- banyand/internal/sidx/part_wrapper.go | 34 +++ banyand/internal/sidx/part_wrapper_test.go | 75 +++++++ banyand/internal/sidx/query_result.go | 300 +++++++++++++++++++++++++++ banyand/internal/sidx/sidx.go | 121 +++++++---- banyand/stream/query_by_idx.go | 3 +- banyand/stream/query_by_ts.go | 6 +- 9 files changed, 545 insertions(+), 81 deletions(-) diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 6fa9df21..9950bbac 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "github.com/apache/skywalking-banyandb/api/common" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -127,10 +126,10 @@ type QueryRequest struct { // Name identifies the series/index to query Name string - // Entities specifies entity filtering (same as StreamQueryOptions) - Entities [][]*modelv1.TagValue + // SeriesIDs specifies the series to query (provided externally) + SeriesIDs []common.SeriesID - // Filter for key range and tag-based filtering using index.Filter + // Filter for tag-based filtering using index.Filter // Note: sidx uses bloom filters for tag filtering, not inverted indexes Filter index.Filter @@ -142,6 +141,12 @@ type QueryRequest struct { // MaxElementSize limits result size MaxElementSize int + + // MinKey specifies the minimum key for range queries (nil = no limit) + MinKey *int64 + + // MaxKey specifies the maximum key for range queries (nil = no limit) + MaxKey *int64 } // QueryResponse contains a batch of query results and execution metadata. @@ -353,49 +358,47 @@ func (qr QueryRequest) Validate() error { if qr.Name == "" { return fmt.Errorf("name cannot be empty") } + if len(qr.SeriesIDs) == 0 { + return fmt.Errorf("at least one SeriesID is required") + } if qr.MaxElementSize < 0 { return fmt.Errorf("maxElementSize cannot be negative") } + // Validate key range + if qr.MinKey != nil && qr.MaxKey != nil && *qr.MinKey > *qr.MaxKey { + return fmt.Errorf("MinKey cannot be greater than MaxKey") + } // Validate tag projection names for i, projection := range qr.TagProjection { if projection.Family == "" { return fmt.Errorf("tagProjection[%d] family cannot be empty", i) } } - // Validate entities structure - for i, entityGroup := range qr.Entities { - if len(entityGroup) == 0 { - return fmt.Errorf("entities[%d] cannot be empty", i) - } - for j, tagValue := range entityGroup { - if tagValue == nil { - return fmt.Errorf("entities[%d][%d] cannot be nil", i, j) - } - } - } return nil } // Reset resets the QueryRequest to its zero state. func (qr *QueryRequest) Reset() { qr.Name = "" - qr.Entities = nil + qr.SeriesIDs = nil qr.Filter = nil qr.Order = nil qr.TagProjection = nil qr.MaxElementSize = 0 + qr.MinKey = nil + qr.MaxKey = nil } // CopyFrom copies the QueryRequest from other to qr. func (qr *QueryRequest) CopyFrom(other *QueryRequest) { qr.Name = other.Name - // Deep copy for Entities if it's a slice - if other.Entities != nil { - qr.Entities = make([][]*modelv1.TagValue, len(other.Entities)) - copy(qr.Entities, other.Entities) + // Deep copy for SeriesIDs if it's a slice + if other.SeriesIDs != nil { + qr.SeriesIDs = make([]common.SeriesID, len(other.SeriesIDs)) + copy(qr.SeriesIDs, other.SeriesIDs) } else { - qr.Entities = nil + qr.SeriesIDs = nil } qr.Filter = other.Filter @@ -410,6 +413,21 @@ func (qr *QueryRequest) CopyFrom(other *QueryRequest) { } qr.MaxElementSize = other.MaxElementSize + + // Copy key range pointers + if other.MinKey != nil { + minKey := *other.MinKey + qr.MinKey = &minKey + } else { + qr.MinKey = nil + } + + if other.MaxKey != nil { + maxKey := *other.MaxKey + qr.MaxKey = &maxKey + } else { + qr.MaxKey = nil + } } // Interface Usage Examples and Best Practices diff --git a/banyand/internal/sidx/interfaces_examples.go b/banyand/internal/sidx/interfaces_examples.go index 47b802a3..bfb03ff3 100644 --- a/banyand/internal/sidx/interfaces_examples.go +++ b/banyand/internal/sidx/interfaces_examples.go @@ -27,7 +27,6 @@ import ( "time" "github.com/apache/skywalking-banyandb/api/common" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -107,14 +106,10 @@ func (e *InterfaceUsageExamples) BasicWriteExample(ctx context.Context) error { func (e *InterfaceUsageExamples) AdvancedQueryExample(ctx context.Context) error { // Create query request with range and tag filtering queryReq := QueryRequest{ - Name: "trace-sidx", - Entities: [][]*modelv1.TagValue{ - { - {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "user-service"}}}, - }, - }, - Filter: nil, // In production, use actual index.Filter implementation - Order: nil, // In production, use actual index.OrderBy implementation + Name: "trace-sidx", + SeriesIDs: []common.SeriesID{1, 2, 3}, // Example series IDs + Filter: nil, // In production, use actual index.Filter implementation + Order: nil, // In production, use actual index.OrderBy implementation TagProjection: []model.TagProjection{ {Family: "service", Names: []string{"name"}}, {Family: "endpoint", Names: []string{"path"}}, diff --git a/banyand/internal/sidx/mock_components.go b/banyand/internal/sidx/mock_components.go index 554b78d9..2e9894e4 100644 --- a/banyand/internal/sidx/mock_components.go +++ b/banyand/internal/sidx/mock_components.go @@ -272,11 +272,15 @@ func (mq *MockQuerier) matchesQuery(elem WriteRequest, req QueryRequest) bool { // Basic range filtering - in reality this would be more sophisticated // For the mock, we'll match all elements unless specific filters are applied - // Apply entity filtering if specified - if len(req.Entities) > 0 { - // Simplified entity matching for the mock - // In practice, this would properly evaluate entity constraints - _ = elem // Use elem to avoid unused parameter warning + // Apply SeriesID filtering if specified + if len(req.SeriesIDs) > 0 { + // Check if the element's SeriesID is in the requested list + for _, sid := range req.SeriesIDs { + if elem.SeriesID == sid { + return true + } + } + return false } // For the mock, we'll accept all elements that pass basic checks diff --git a/banyand/internal/sidx/part_wrapper.go b/banyand/internal/sidx/part_wrapper.go index c5c41137..c3699148 100644 --- a/banyand/internal/sidx/part_wrapper.go +++ b/banyand/internal/sidx/part_wrapper.go @@ -222,3 +222,37 @@ func (pw *partWrapper) String() string { return fmt.Sprintf("partWrapper{id=%d, state=%s, ref=%d, path=%s}", pw.ID(), state, refCount, pw.p.path) } + +// overlapsKeyRange checks if the part overlaps with the given key range. +// Returns true if there is any overlap between the part's key range and the query range. +// Uses part metadata to perform efficient range filtering without I/O. +func (pw *partWrapper) overlapsKeyRange(minKey, maxKey int64) bool { + if pw.p == nil { + return false + } + + // Validate input range + if minKey > maxKey { + return false + } + + // Check if part metadata is available + if pw.p.partMetadata == nil { + // If no metadata available, assume overlap to be safe + // This ensures we don't skip parts that might contain relevant data + return true + } + + pm := pw.p.partMetadata + + // Check for non-overlapping ranges using De Morgan's law: + // Two ranges [a,b] and [c,d] don't overlap if: b < c OR a > d + // Therefore, they DO overlap if: NOT(b < c OR a > d) = (b >= c AND a <= d) + // Simplified: part.MaxKey >= query.MinKey AND part.MinKey <= query.MaxKey + if pm.MaxKey < minKey || pm.MinKey > maxKey { + return false + } + + // Ranges overlap + return true +} diff --git a/banyand/internal/sidx/part_wrapper_test.go b/banyand/internal/sidx/part_wrapper_test.go index c05b5e87..42bfebd1 100644 --- a/banyand/internal/sidx/part_wrapper_test.go +++ b/banyand/internal/sidx/part_wrapper_test.go @@ -514,3 +514,78 @@ func TestPartWrapper_CleanupIdempotency(t *testing.T) { assert.True(t, pw.isRemoved()) assert.Equal(t, int32(0), pw.refCount()) } + +func TestPartWrapper_OverlapsKeyRange(t *testing.T) { + tests := []struct { + name string + partMin int64 + partMax int64 + queryMin int64 + queryMax int64 + expected bool + }{ + {"complete_overlap", 10, 20, 5, 25, true}, + {"part_inside_query", 10, 20, 5, 25, true}, + {"query_inside_part", 5, 25, 10, 20, true}, + {"partial_overlap_left", 10, 20, 15, 25, true}, + {"partial_overlap_right", 10, 20, 5, 15, true}, + {"adjacent_left_no_overlap", 10, 20, 1, 9, false}, + {"adjacent_right_no_overlap", 10, 20, 21, 30, false}, + {"touching_left_boundary", 10, 20, 5, 10, true}, + {"touching_right_boundary", 10, 20, 20, 25, true}, + {"completely_separate_left", 10, 20, 1, 5, false}, + {"completely_separate_right", 10, 20, 25, 30, false}, + {"exact_match", 10, 20, 10, 20, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create part with metadata + p := &part{ + path: "/test/part/001", + partMetadata: &partMetadata{ + ID: 1, + MinKey: tt.partMin, + MaxKey: tt.partMax, + }, + } + pw := newPartWrapper(p) + + result := pw.overlapsKeyRange(tt.queryMin, tt.queryMax) + assert.Equal(t, tt.expected, result, + "part[%d,%d] query[%d,%d] should be %v", + tt.partMin, tt.partMax, tt.queryMin, tt.queryMax, tt.expected) + }) + } +} + +func TestPartWrapper_OverlapsKeyRange_EdgeCases(t *testing.T) { + t.Run("nil_part", func(t *testing.T) { + pw := newPartWrapper(nil) + assert.False(t, pw.overlapsKeyRange(10, 20)) + }) + + t.Run("nil_metadata", func(t *testing.T) { + p := &part{ + path: "/test/part/001", + partMetadata: nil, + } + pw := newPartWrapper(p) + // Should return true (safe default) when metadata is unavailable + assert.True(t, pw.overlapsKeyRange(10, 20)) + }) + + t.Run("invalid_query_range", func(t *testing.T) { + p := &part{ + path: "/test/part/001", + partMetadata: &partMetadata{ + ID: 1, + MinKey: 10, + MaxKey: 20, + }, + } + pw := newPartWrapper(p) + // Query range where min > max should return false + assert.False(t, pw.overlapsKeyRange(25, 15)) + }) +} diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go new file mode 100644 index 00000000..6c3ea5a9 --- /dev/null +++ b/banyand/internal/sidx/query_result.go @@ -0,0 +1,300 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "context" + "sync" + + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// queryResult implements QueryResult interface with worker pool pattern. +// Following the tsResult architecture from the stream module. +type queryResult struct { + // Core query components + snapshot *snapshot // Reference to current snapshot + request QueryRequest // Original query request (contains all parameters) + ctx context.Context + + // Block scanning components + bs *blockScanner // Block scanner for iteration + parts []*part // Selected parts for query + + // Worker coordination + shards []*QueryResponse // Result shards from parallel workers + pm protector.Memory // Memory quota management + l *logger.Logger // Logger instance + + // Query parameters (derived from request) + asc bool // Ordering direction + + // State management + released bool +} + +// Pull returns the next batch of query results using parallel worker processing. +func (qr *queryResult) Pull() *QueryResponse { + if qr.released || qr.bs == nil { + return nil + } + + return qr.runBlockScanner() +} + +// runBlockScanner coordinates the worker pool with block scanner following tsResult pattern. +func (qr *queryResult) runBlockScanner() *QueryResponse { + workerSize := cgroups.CPUs() + batchCh := make(chan *blockScanResultBatch, workerSize) + + // Initialize worker result shards + if qr.shards == nil { + qr.shards = make([]*QueryResponse, workerSize) + for i := range qr.shards { + qr.shards[i] = &QueryResponse{ + Keys: make([]int64, 0), + Data: make([][]byte, 0), + Tags: make([][]Tag, 0), + SIDs: make([]common.SeriesID, 0), + } + } + } else { + // Reset existing shards + for i := range qr.shards { + qr.shards[i].Reset() + } + } + + // Launch worker pool + var workerWg sync.WaitGroup + workerWg.Add(workerSize) + + for i := range workerSize { + go func(workerID int) { + defer workerWg.Done() + qr.processWorkerBatches(workerID, batchCh) + }(i) + } + + // Start block scanning + go func() { + qr.bs.scan(qr.ctx, batchCh) + close(batchCh) + }() + + workerWg.Wait() + + // Check for completion + if len(qr.bs.parts) == 0 { + qr.bs.close() + qr.bs = nil + } + + // Merge results from all workers + return qr.mergeWorkerResults() +} + +// processWorkerBatches processes batches in a worker goroutine. +func (qr *queryResult) processWorkerBatches(workerID int, batchCh chan *blockScanResultBatch) { + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + + for batch := range batchCh { + if batch.err != nil { + qr.shards[workerID].Error = batch.err + releaseBlockScanResultBatch(batch) + continue + } + + for _, bs := range batch.bss { + qr.loadAndProcessBlock(tmpBlock, bs, qr.shards[workerID]) + } + + releaseBlockScanResultBatch(batch) + } +} + +// loadAndProcessBlock loads a block from part and processes it into QueryResponse format. +func (qr *queryResult) loadAndProcessBlock(tmpBlock *block, bs blockScanResult, result *QueryResponse) bool { + tmpBlock.reset() + + // Load block data from part (similar to stream's loadBlockCursor) + if !qr.loadBlockData(tmpBlock, bs.p, &bs.bm) { + return false + } + + // Convert block data to QueryResponse format + qr.convertBlockToResponse(tmpBlock, bs.bm.seriesID, result) + + return true +} + +// loadBlockData loads block data from part using block metadata. +func (qr *queryResult) loadBlockData(tmpBlock *block, p *part, bm *blockMetadata) bool { + // TODO: Implement actual block loading from part + // This should use the existing block reader functionality + _ = tmpBlock // TODO: use when implementing block loading + _ = p // TODO: use when implementing block loading + _ = bm // TODO: use when implementing block loading + // For now, return false to avoid processing + return false +} + +// convertBlockToResponse converts SIDX block data to QueryResponse format. +func (qr *queryResult) convertBlockToResponse(block *block, seriesID common.SeriesID, result *QueryResponse) { + elemCount := len(block.userKeys) + + for i := 0; i < elemCount; i++ { + // Apply MaxElementSize limit from request + if result.Len() >= qr.request.MaxElementSize { + break + } + + // Copy parallel arrays + result.Keys = append(result.Keys, block.userKeys[i]) + result.Data = append(result.Data, block.data[i]) + result.SIDs = append(result.SIDs, seriesID) + + // Convert tag map to tag slice for this element + elementTags := qr.extractElementTags(block, i) + result.Tags = append(result.Tags, elementTags) + } +} + +// extractElementTags extracts tags for a specific element with projection support. +func (qr *queryResult) extractElementTags(block *block, elemIndex int) []Tag { + var elementTags []Tag + + // Apply tag projection from request + if len(qr.request.TagProjection) > 0 { + elementTags = make([]Tag, 0, len(qr.request.TagProjection)) + for _, proj := range qr.request.TagProjection { + for _, tagName := range proj.Names { + if tagData, exists := block.tags[tagName]; exists && elemIndex < len(tagData.values) { + elementTags = append(elementTags, Tag{ + name: tagName, + value: tagData.values[elemIndex], + valueType: tagData.valueType, + }) + } + } + } + } else { + // Include all tags if no projection specified + elementTags = make([]Tag, 0, len(block.tags)) + for tagName, tagData := range block.tags { + if elemIndex < len(tagData.values) { + elementTags = append(elementTags, Tag{ + name: tagName, + value: tagData.values[elemIndex], + valueType: tagData.valueType, + }) + } + } + } + + return elementTags +} + +// mergeWorkerResults merges results from all worker shards with error handling. +func (qr *queryResult) mergeWorkerResults() *QueryResponse { + // Check for errors first + var err error + for i := range qr.shards { + if qr.shards[i].Error != nil { + err = multierr.Append(err, qr.shards[i].Error) + } + } + + if err != nil { + return &QueryResponse{Error: err} + } + + // Merge results with ordering from request + if qr.asc { + return mergeQueryResponseShardsAsc(qr.shards, qr.request.MaxElementSize) + } else { + return mergeQueryResponseShardsDesc(qr.shards, qr.request.MaxElementSize) + } +} + +// Release releases resources associated with the query result. +func (qr *queryResult) Release() { + if qr.released { + return + } + qr.released = true + + if qr.bs != nil { + qr.bs.close() + } + + if qr.snapshot != nil { + qr.snapshot.decRef() + qr.snapshot = nil + } +} + +// mergeQueryResponseShardsAsc merges multiple QueryResponse shards in ascending order. +func mergeQueryResponseShardsAsc(shards []*QueryResponse, maxElements int) *QueryResponse { + result := &QueryResponse{ + Keys: make([]int64, 0, maxElements), + Data: make([][]byte, 0, maxElements), + Tags: make([][]Tag, 0, maxElements), + SIDs: make([]common.SeriesID, 0, maxElements), + } + + // Simple concatenation for now - TODO: implement proper merge sort + for _, shard := range shards { + for i := 0; i < shard.Len() && result.Len() < maxElements; i++ { + result.Keys = append(result.Keys, shard.Keys[i]) + result.Data = append(result.Data, shard.Data[i]) + result.Tags = append(result.Tags, shard.Tags[i]) + result.SIDs = append(result.SIDs, shard.SIDs[i]) + } + } + + return result +} + +// mergeQueryResponseShardsDesc merges multiple QueryResponse shards in descending order. +func mergeQueryResponseShardsDesc(shards []*QueryResponse, maxElements int) *QueryResponse { + result := &QueryResponse{ + Keys: make([]int64, 0, maxElements), + Data: make([][]byte, 0, maxElements), + Tags: make([][]Tag, 0, maxElements), + SIDs: make([]common.SeriesID, 0, maxElements), + } + + // Simple concatenation for now - TODO: implement proper merge sort + for _, shard := range shards { + for i := 0; i < shard.Len() && result.Len() < maxElements; i++ { + result.Keys = append(result.Keys, shard.Keys[i]) + result.Data = append(result.Data, shard.Data[i]) + result.Tags = append(result.Tags, shard.Tags[i]) + result.SIDs = append(result.SIDs, shard.SIDs[i]) + } + } + + return result +} diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 4c7fdc4f..262a72f1 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -19,10 +19,14 @@ package sidx import ( "context" + "math" + "sort" "sync" "sync/atomic" "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/watcher" @@ -37,6 +41,7 @@ type sidx struct { loopCloser *run.Closer gc *gc l *logger.Logger + pm protector.Memory totalIntroduceLoopStarted atomic.Int64 totalIntroduceLoopFinished atomic.Int64 mu sync.RWMutex @@ -58,6 +63,7 @@ func NewSIDX(opts *Options) (SIDX, error) { mergeCh: make(chan *mergerIntroduction), loopCloser: run.NewCloser(1), l: logger.GetLogger().Named("sidx"), + pm: opts.Memory, } // Initialize garbage collector @@ -110,6 +116,42 @@ func (s *sidx) Write(ctx context.Context, reqs []WriteRequest) error { } } +// extractKeyRange extracts min/max key range from QueryRequest. +func extractKeyRange(req QueryRequest) (int64, int64) { + minKey := int64(math.MinInt64) + maxKey := int64(math.MaxInt64) + + if req.MinKey != nil { + minKey = *req.MinKey + } + if req.MaxKey != nil { + maxKey = *req.MaxKey + } + + return minKey, maxKey +} + +// extractOrdering extracts ordering direction from QueryRequest. +func extractOrdering(req QueryRequest) bool { + if req.Order == nil { + return true // Default ascending + } + return req.Order.Sort != modelv1.Sort_SORT_DESC +} + +// selectPartsForQuery selects relevant parts from snapshot based on key range. +func selectPartsForQuery(snap *snapshot, minKey, maxKey int64) []*part { + var selectedParts []*part + + for _, pw := range snap.parts { + if pw.isActive() && pw.overlapsKeyRange(minKey, maxKey) { + selectedParts = append(selectedParts, pw.p) + } + } + + return selectedParts +} + // Query implements SIDX interface. func (s *sidx) Query(ctx context.Context, req QueryRequest) (QueryResult, error) { if err := req.Validate(); err != nil { @@ -122,14 +164,46 @@ func (s *sidx) Query(ctx context.Context, req QueryRequest) (QueryResult, error) return &emptyQueryResult{}, nil } - // Create query result - qr := &queryResult{ + // Extract parameters directly from request + minKey, maxKey := extractKeyRange(req) + asc := extractOrdering(req) + + // Select relevant parts + parts := selectPartsForQuery(snap, minKey, maxKey) + if len(parts) == 0 { + snap.decRef() + return &emptyQueryResult{}, nil + } + + // Sort SeriesIDs for efficient processing + seriesIDs := make([]common.SeriesID, len(req.SeriesIDs)) + copy(seriesIDs, req.SeriesIDs) + sort.Slice(seriesIDs, func(i, j int) bool { + return seriesIDs[i] < seriesIDs[j] + }) + + // Initialize block scanner using request parameters directly + bs := &blockScanner{ + pm: s.pm, + filter: req.Filter, + l: s.l, + parts: parts, + seriesIDs: seriesIDs, + minKey: minKey, + maxKey: maxKey, + asc: asc, + } + + return &queryResult{ snapshot: snap, request: req, ctx: ctx, - } - - return qr, nil + bs: bs, + parts: parts, + asc: asc, + pm: s.pm, + l: s.l, + }, nil } // Stats implements SIDX interface. @@ -313,40 +387,3 @@ func (e *emptyQueryResult) Pull() *QueryResponse { func (e *emptyQueryResult) Release() { // Nothing to release } - -// queryResult implements QueryResult interface. -type queryResult struct { - ctx context.Context - snapshot *snapshot - request QueryRequest - released bool -} - -func (qr *queryResult) Pull() *QueryResponse { - if qr.released { - return nil - } - - // Simple implementation - return empty response - // TODO: Implement actual query logic - return &QueryResponse{ - Keys: []int64{}, - Data: [][]byte{}, - Tags: [][]Tag{}, - SIDs: []common.SeriesID{}, - Metadata: ResponseMetadata{ - ExecutionTimeMs: 0, - }, - } -} - -func (qr *queryResult) Release() { - if qr.released { - return - } - qr.released = true - if qr.snapshot != nil { - qr.snapshot.decRef() - qr.snapshot = nil - } -} diff --git a/banyand/stream/query_by_idx.go b/banyand/stream/query_by_idx.go index 3d5ee61e..0baadbad 100644 --- a/banyand/stream/query_by_idx.go +++ b/banyand/stream/query_by_idx.go @@ -131,6 +131,7 @@ func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamRes } cursorChan := make(chan int, len(qr.data)) + is := qr.sm.indexSchema.Load().(indexSchema) for i := 0; i < len(qr.data); i++ { go func(i int) { select { @@ -146,7 +147,7 @@ func (qr *idxResult) load(ctx context.Context, qo queryOptions) *model.StreamRes } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) - if loadBlockCursor(qr.data[i], tmpBlock, qo, qr.sm) { + if loadBlockCursor(qr.data[i], tmpBlock, qo, is) { cursorChan <- -1 return } diff --git a/banyand/stream/query_by_ts.go b/banyand/stream/query_by_ts.go index 1607044a..f91dc293 100644 --- a/banyand/stream/query_by_ts.go +++ b/banyand/stream/query_by_ts.go @@ -105,6 +105,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) (*model.StreamResult, erro t.shards[i].Reset() } } + is := t.sm.indexSchema.Load().(indexSchema) for i := range workerSize { go func(workerID int) { tmpBlock := generateBlock() @@ -121,7 +122,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) (*model.StreamResult, erro for _, bs := range batch.bss { bc := generateBlockCursor() bc.init(bs.p, &bs.bm, bs.qo) - if loadBlockCursor(bc, tmpBlock, bs.qo, t.sm) { + if loadBlockCursor(bc, tmpBlock, bs.qo, is) { if !t.asc { bc.idx = len(bc.timestamps) - 1 } @@ -156,7 +157,7 @@ func (t *tsResult) runTabScanner(ctx context.Context) (*model.StreamResult, erro return model.MergeStreamResults(t.shards, t.qo.MaxElementSize, t.asc), nil } -func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm *stream) bool { +func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, is indexSchema) bool { tmpBlock.reset() if !bc.loadData(tmpBlock) { releaseBlockCursor(bc) @@ -168,7 +169,6 @@ func loadBlockCursor(bc *blockCursor, tmpBlock *block, qo queryOptions, sm *stre for idx, tagFamily := range bc.tagFamilies { tagFamilyMap[tagFamily.name] = idx + 1 } - is := sm.indexSchema.Load().(indexSchema) for _, tagFamilyProj := range bc.tagProjection { for j, tagProj := range tagFamilyProj.Names { tagSpec := is.tagMap[tagProj]
