This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sidx/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3457435cb5ba4dd12bf8d9370872301583b54f6e Author: Gao Hongtao <[email protected]> AuthorDate: Wed Aug 27 12:43:48 2025 +0800 Update QueryRequest to include SeriesIDs: Modify various test cases and components to incorporate SeriesIDs in QueryRequest, enhancing query handling and ensuring consistency across tests. Adjust comments for clarity and improve the structure of test cases related to query results and mock components. --- banyand/internal/sidx/TODO.md | 20 +++++++++---------- banyand/internal/sidx/block_scanner_test.go | 4 ++-- .../internal/sidx/integration_test_framework.go | 18 +++++++++++++---- .../sidx/integration_test_framework_test.go | 4 ++++ banyand/internal/sidx/iter_test.go | 16 ++++++++++----- banyand/internal/sidx/mock_components_test.go | 4 ++++ banyand/internal/sidx/mock_sidx_test.go | 10 ++++++---- banyand/internal/sidx/mock_usage_test.go | 23 ++++++++++++++++++---- banyand/internal/sidx/part_iter_test.go | 4 ++-- banyand/internal/sidx/query_result.go | 5 +++-- banyand/internal/sidx/query_result_test.go | 19 +++++++++--------- 11 files changed, 85 insertions(+), 42 deletions(-) diff --git a/banyand/internal/sidx/TODO.md b/banyand/internal/sidx/TODO.md index 38fa5ed1..51fb8b0c 100644 --- a/banyand/internal/sidx/TODO.md +++ b/banyand/internal/sidx/TODO.md @@ -63,16 +63,16 @@ This document tracks the implementation progress of the Secondary Index File Sys - [x] Worker coordination and error propagation ### 6.4 Query Result (`query_result.go` - like stream's `tsResult`) -- [ ] **Implementation Tasks**: - - [ ] Create `queryResult` struct implementing `QueryResult` interface - - [ ] Implement `Pull() *QueryResponse` with worker pool pattern - - [ ] Add `runBlockScanner(ctx)` for parallel processing - - [ ] Create `Release()` for resource cleanup -- [ ] **Test Cases**: - - [ ] Pull/Release pattern prevents resource leaks - - [ ] Parallel workers maintain result ordering - - [ ] Result merging produces correct final output - - [ ] Performance scales with worker count +- [x] **Implementation Tasks**: + - [x] Create `queryResult` struct implementing `QueryResult` interface + - [x] Implement `Pull() *QueryResponse` with worker pool pattern + - [x] Add `runBlockScanner(ctx)` for parallel processing + - [x] Create `Release()` for resource cleanup +- [x] **Test Cases**: + - [x] Pull/Release pattern prevents resource leaks + - [x] Parallel workers maintain result ordering + - [x] Result merging produces correct final output + - [x] Performance scales with worker count ### 6.5 SIDX Query Interface (extend `sidx.go`) - [ ] **Implementation Tasks**: diff --git a/banyand/internal/sidx/block_scanner_test.go b/banyand/internal/sidx/block_scanner_test.go index 39ea3257..fea166bf 100644 --- a/banyand/internal/sidx/block_scanner_test.go +++ b/banyand/internal/sidx/block_scanner_test.go @@ -286,7 +286,7 @@ func TestBlockScanner_ContextCancellation(t *testing.T) { } // With empty parts, no batches should be received - assert.Equal(t, 0, batchesReceived, "Empty parts should produce no batches even with cancelled context") + assert.Equal(t, 0, batchesReceived, "Empty parts should produce no batches even with canceled context") scanner.close() } @@ -298,7 +298,7 @@ func TestBlockScanner_FilteredScan(t *testing.T) { // Test with nil filter and empty parts parts := []*part{} - var mockFilter index.Filter = nil + var mockFilter index.Filter scanner := &blockScanner{ pm: mockProtector, diff --git a/banyand/internal/sidx/integration_test_framework.go b/banyand/internal/sidx/integration_test_framework.go index 908ccf12..18fd40d9 100644 --- a/banyand/internal/sidx/integration_test_framework.go +++ b/banyand/internal/sidx/integration_test_framework.go @@ -517,9 +517,10 @@ func (itf *IntegrationTestFramework) registerDefaultScenarios() { return fmt.Errorf("write failed: %w", err) } - // Query the data back (use the series name that was written) + // Query the data back using SeriesIDs that were written queryReq := QueryRequest{ Name: "series_1", // Match the key used in MockSIDX write + SeriesIDs: []common.SeriesID{common.SeriesID(1), common.SeriesID(2), common.SeriesID(3)}, MaxElementSize: 100, } @@ -628,9 +629,14 @@ func (itf *IntegrationTestFramework) registerDefaultScenarios() { return fmt.Errorf("large write failed: %w", err) } - // Query with pagination + // Query with pagination using all SeriesIDs written + seriesIDs := make([]common.SeriesID, 10) + for i := 1; i <= 10; i++ { + seriesIDs[i-1] = common.SeriesID(i) + } queryReq := QueryRequest{ Name: "large-dataset-query", + SeriesIDs: seriesIDs, MaxElementSize: 50, } @@ -744,8 +750,10 @@ func (itf *IntegrationTestFramework) registerDefaultBenchmarks() { start := time.Now() for i := 0; i < numQueries; i++ { + seriesID := common.SeriesID((i % 10) + 1) // Query existing series queryReq := QueryRequest{ - Name: fmt.Sprintf("series_%d", (i%10)+1), // Query existing series + Name: fmt.Sprintf("series_%d", seriesID), + SeriesIDs: []common.SeriesID{seriesID}, MaxElementSize: 50, } @@ -856,9 +864,11 @@ func (itf *IntegrationTestFramework) registerDefaultStressTests() { return framework.sidx.Write(ctx, requests) } - // Read operation + // Read operation - query one of the series we know exists (from setup) + seriesID := common.SeriesID((workerID % 5) + 1) // Query from the 5 series in setup queryReq := QueryRequest{ Name: fmt.Sprintf("stress-query-%d", workerID), + SeriesIDs: []common.SeriesID{seriesID}, MaxElementSize: 20, } diff --git a/banyand/internal/sidx/integration_test_framework_test.go b/banyand/internal/sidx/integration_test_framework_test.go index 6b0ee5ef..365cfe06 100644 --- a/banyand/internal/sidx/integration_test_framework_test.go +++ b/banyand/internal/sidx/integration_test_framework_test.go @@ -25,6 +25,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" ) func TestIntegrationTestFramework_Creation(t *testing.T) { @@ -483,8 +485,10 @@ func BenchmarkIntegrationTestFramework_QueryPerformance(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { + seriesID := common.SeriesID((i % 5) + 1) // Query existing series queryReq := QueryRequest{ Name: fmt.Sprintf("bench-query-%d", i), + SeriesIDs: []common.SeriesID{seriesID}, MaxElementSize: 50, } diff --git a/banyand/internal/sidx/iter_test.go b/banyand/internal/sidx/iter_test.go index 6837c1dd..93c14e81 100644 --- a/banyand/internal/sidx/iter_test.go +++ b/banyand/internal/sidx/iter_test.go @@ -135,7 +135,10 @@ func TestIterComprehensive(t *testing.T) { // Part 2 { {seriesID: 3, userKey: 300, data: []byte("data3"), tags: []tag{{name: "service", value: []byte("user-service"), valueType: pbv1.ValueTypeStr, indexed: true}}}, - {seriesID: 4, userKey: 400, data: []byte("data4"), tags: []tag{{name: "service", value: []byte("notification-service"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + { + seriesID: 4, userKey: 400, data: []byte("data4"), + tags: []tag{{name: "service", value: []byte("notification-service"), valueType: pbv1.ValueTypeStr, indexed: true}}, + }, }, }, querySids: []common.SeriesID{2, 4}, // Only series 2 and 4 @@ -159,7 +162,10 @@ func TestIterComprehensive(t *testing.T) { // Part 2 { {seriesID: 3, userKey: 250, data: []byte("data3"), tags: []tag{{name: "service", value: []byte("user-service"), valueType: pbv1.ValueTypeStr, indexed: true}}}, - {seriesID: 4, userKey: 350, data: []byte("data4"), tags: []tag{{name: "service", value: []byte("notification-service"), valueType: pbv1.ValueTypeStr, indexed: true}}}, + { + seriesID: 4, userKey: 350, data: []byte("data4"), + tags: []tag{{name: "service", value: []byte("notification-service"), valueType: pbv1.ValueTypeStr, indexed: true}}, + }, }, }, querySids: []common.SeriesID{1, 2, 3, 4}, @@ -519,7 +525,7 @@ func TestBlockMetadataLess(t *testing.T) { }) } -// Helper types and functions +// Helper types and functions. type blockExpectation struct { seriesID common.SeriesID @@ -527,9 +533,9 @@ type blockExpectation struct { maxKey int64 } -// mockBlockFilter is already defined in part_iter_test.go +// mockBlockFilter is already defined in part_iter_test.go. -// runIteratorTest runs the iterator test with the given test case and parts +// runIteratorTest runs the iterator test with the given test case and parts. func runIteratorTest(t *testing.T, tc struct { blockFilter index.Filter name string diff --git a/banyand/internal/sidx/mock_components_test.go b/banyand/internal/sidx/mock_components_test.go index 7e805f5a..04e3d68f 100644 --- a/banyand/internal/sidx/mock_components_test.go +++ b/banyand/internal/sidx/mock_components_test.go @@ -220,6 +220,7 @@ func TestMockQuerier_BasicOperations(t *testing.T) { // Test query operations queryReq := QueryRequest{ Name: "test-query", + SeriesIDs: []common.SeriesID{1, 2}, MaxElementSize: 10, } @@ -264,6 +265,7 @@ func TestMockQuerier_QueryBatching(t *testing.T) { // Query with small batch size queryReq := QueryRequest{ Name: "batch-test", + SeriesIDs: []common.SeriesID{1}, MaxElementSize: 3, } @@ -425,6 +427,7 @@ func TestMockComponentSuite_Integration(t *testing.T) { // Query elements queryReq := QueryRequest{ Name: "integration-test", + SeriesIDs: []common.SeriesID{1, 2}, MaxElementSize: 10, } @@ -629,6 +632,7 @@ func TestMockComponents_PerformanceCharacteristics(t *testing.T) { queryReq := QueryRequest{ Name: "performance-test", + SeriesIDs: []common.SeriesID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, MaxElementSize: 100, } diff --git a/banyand/internal/sidx/mock_sidx_test.go b/banyand/internal/sidx/mock_sidx_test.go index aa4acec1..a57f01b1 100644 --- a/banyand/internal/sidx/mock_sidx_test.go +++ b/banyand/internal/sidx/mock_sidx_test.go @@ -97,6 +97,7 @@ func TestMockSIDX_BasicOperations(t *testing.T) { // Test query operations queryReq := QueryRequest{ Name: "series_1", + SeriesIDs: []common.SeriesID{1}, MaxElementSize: 10, } @@ -295,7 +296,7 @@ func TestMockSIDX_Delays(t *testing.T) { // Test query delay start = time.Now() - queryReq := QueryRequest{Name: "series_1"} + queryReq := QueryRequest{Name: "series_1", SeriesIDs: []common.SeriesID{1}} result, err := mock.Query(ctx, queryReq) require.NoError(t, err) result.Release() @@ -337,7 +338,7 @@ func TestMockSIDX_Sorting(t *testing.T) { require.NoError(t, err) // Query series 1 - should be sorted by key - queryReq := QueryRequest{Name: "series_1"} + queryReq := QueryRequest{Name: "series_1", SeriesIDs: []common.SeriesID{1}} result, err := mock.Query(ctx, queryReq) require.NoError(t, err) defer result.Release() @@ -348,7 +349,7 @@ func TestMockSIDX_Sorting(t *testing.T) { assert.Equal(t, [][]byte{[]byte("data1"), []byte("data3")}, response.Data) // Query series 2 - should be sorted by key - queryReq = QueryRequest{Name: "series_2"} + queryReq = QueryRequest{Name: "series_2", SeriesIDs: []common.SeriesID{2}} result, err = mock.Query(ctx, queryReq) require.NoError(t, err) defer result.Release() @@ -383,6 +384,7 @@ func TestMockSIDX_QueryBatching(t *testing.T) { // Query with small batch size queryReq := QueryRequest{ Name: "series_1", + SeriesIDs: []common.SeriesID{1}, MaxElementSize: 3, } result, err := mock.Query(ctx, queryReq) @@ -527,7 +529,7 @@ func TestMockSIDX_DynamicConfiguration(t *testing.T) { mock.SetQueryDelay(30) start = time.Now() - result, err := mock.Query(ctx, QueryRequest{Name: "series_1"}) + result, err := mock.Query(ctx, QueryRequest{Name: "series_1", SeriesIDs: []common.SeriesID{1}}) require.NoError(t, err) result.Release() elapsed = time.Since(start) diff --git a/banyand/internal/sidx/mock_usage_test.go b/banyand/internal/sidx/mock_usage_test.go index f225392f..5a823521 100644 --- a/banyand/internal/sidx/mock_usage_test.go +++ b/banyand/internal/sidx/mock_usage_test.go @@ -68,6 +68,7 @@ func TestDocumentation_BasicWriteReadWorkflow(t *testing.T) { // Query the data back (from documentation example) queryReq := QueryRequest{ Name: "series_1", // Matches the series name used in mock + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, MaxElementSize: 100, } @@ -116,6 +117,7 @@ func TestDocumentation_ComponentIntegration(t *testing.T) { // Query data (from documentation example) queryReq := QueryRequest{ Name: "test-query", + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, MaxElementSize: 10, } @@ -352,6 +354,7 @@ func TestDocumentation_TroubleshootingExamples(t *testing.T) { // Without syncing, query should return no results queryReq := QueryRequest{ Name: "test-query", + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, MaxElementSize: 100, } result, err := suite.Querier.Query(ctx, queryReq) @@ -492,7 +495,11 @@ func TestDocumentation_DebugTips(t *testing.T) { assert.Equal(t, int64(2), count) // Query to increment query count - queryReq := QueryRequest{Name: "series_1", MaxElementSize: 10} + queryReq := QueryRequest{ + Name: "series_1", + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, + MaxElementSize: 10, + } result, err := mockSIDX.Query(ctx, queryReq) require.NoError(t, err) result.Release() @@ -523,7 +530,11 @@ func TestDocumentation_Migration(t *testing.T) { err := sidx.Write(ctx, writeReqs) require.NoError(t, err) - queryReq := QueryRequest{Name: "series_1", MaxElementSize: 10} + queryReq := QueryRequest{ + Name: "series_1", + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, + MaxElementSize: 10, + } result, err := sidx.Query(ctx, queryReq) require.NoError(t, err) defer result.Release() @@ -564,7 +575,11 @@ func TestDocumentation_PerformanceCharacteristics(t *testing.T) { assert.Greater(t, throughput, 1000.0, "Should achieve reasonable write throughput") // Test query performance (from documentation) - queryReq := QueryRequest{Name: "series_1", MaxElementSize: 50} + queryReq := QueryRequest{ + Name: "series_1", + SeriesIDs: []common.SeriesID{common.SeriesID(1)}, + MaxElementSize: 50, + } start = time.Now() result, err := mockSIDX.Query(ctx, queryReq) @@ -593,7 +608,7 @@ func TestDocumentation_PerformanceCharacteristics(t *testing.T) { // Helper function from documentation. func int64ToBytesForTest(val int64) []byte { result := make([]byte, 8) - for i := 0; i < 8; i++ { + for i := range 8 { result[7-i] = byte(val >> (8 * i)) } return result diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go index 0f5f6677..3289645e 100644 --- a/banyand/internal/sidx/part_iter_test.go +++ b/banyand/internal/sidx/part_iter_test.go @@ -684,7 +684,7 @@ type mockBlockFilter struct { shouldSkip bool } -func (mbf *mockBlockFilter) ShouldSkip(filterOp index.FilterOp) (bool, error) { +func (mbf *mockBlockFilter) ShouldSkip(_ index.FilterOp) (bool, error) { if mbf.err != nil { return false, mbf.err } @@ -696,7 +696,7 @@ func (mbf *mockBlockFilter) String() string { return "mockBlockFilter" } -func (mbf *mockBlockFilter) Execute(getSearcher index.GetSearcher, seriesID common.SeriesID, timeRange *index.RangeOpts) (posting.List, posting.List, error) { +func (mbf *mockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _ *index.RangeOpts) (posting.List, posting.List, error) { // Not used in our tests, return empty implementation return nil, nil, nil } diff --git a/banyand/internal/sidx/query_result.go b/banyand/internal/sidx/query_result.go index 140e2012..bceaddab 100644 --- a/banyand/internal/sidx/query_result.go +++ b/banyand/internal/sidx/query_result.go @@ -409,9 +409,8 @@ func (qr *queryResult) mergeWorkerResults() *QueryResponse { // Merge results with ordering from request if qr.asc { return mergeQueryResponseShardsAsc(qr.shards, qr.request.MaxElementSize) - } else { - return mergeQueryResponseShardsDesc(qr.shards, qr.request.MaxElementSize) } + return mergeQueryResponseShardsDesc(qr.shards, qr.request.MaxElementSize) } // Release releases resources associated with the query result. @@ -532,10 +531,12 @@ func (qrh *QueryResponseHeap) Swap(i, j int) { qrh.cursors[i], qrh.cursors[j] = qrh.cursors[j], qrh.cursors[i] } +// Push adds an element to the heap. func (qrh *QueryResponseHeap) Push(x interface{}) { qrh.cursors = append(qrh.cursors, x.(*QueryResponseCursor)) } +// Pop removes and returns the top element from the heap. func (qrh *QueryResponseHeap) Pop() interface{} { old := qrh.cursors n := len(old) diff --git a/banyand/internal/sidx/query_result_test.go b/banyand/internal/sidx/query_result_test.go index c09e86d7..5f445358 100644 --- a/banyand/internal/sidx/query_result_test.go +++ b/banyand/internal/sidx/query_result_test.go @@ -18,6 +18,7 @@ package sidx import ( + "bytes" "container/heap" "context" "testing" @@ -644,7 +645,7 @@ func createBenchmarkResponseDesc(size int, offset, step int64) *QueryResponse { return response } -// Tests for queryResult struct using memPart datasets +// Tests for queryResult struct using memPart datasets. func TestQueryResult_Pull_SingleMemPart(t *testing.T) { // Create test dataset with multiple series @@ -790,7 +791,7 @@ func TestQueryResult_Pull_SingleMemPart(t *testing.T) { } // Verify data matches - if string(data) != string(expected.data) { + if !bytes.Equal(data, expected.data) { t.Errorf("At position %d: expected data %s, got %s", i, string(expected.data), string(data)) } @@ -805,7 +806,7 @@ func TestQueryResult_Pull_SingleMemPart(t *testing.T) { if tag.name != expectedTag.name { t.Errorf("At position %d, tag %d: expected name %s, got %s", i, j, expectedTag.name, tag.name) } - if string(tag.value) != string(expectedTag.value) { + if !bytes.Equal(tag.value, expectedTag.value) { t.Errorf("At position %d, tag %d: expected value %s, got %s", i, j, string(expectedTag.value), string(tag.value)) } if tag.valueType != expectedTag.valueType { @@ -957,7 +958,7 @@ func TestQueryResult_Pull_MultipleMemParts(t *testing.T) { } // Verify data matches - if string(data) != string(expected.data) { + if !bytes.Equal(data, expected.data) { t.Errorf("At position %d: expected data %s, got %s", i, string(expected.data), string(data)) } @@ -972,7 +973,7 @@ func TestQueryResult_Pull_MultipleMemParts(t *testing.T) { if tag.name != expectedTag.name { t.Errorf("At position %d, tag %d: expected name %s, got %s", i, j, expectedTag.name, tag.name) } - if string(tag.value) != string(expectedTag.value) { + if !bytes.Equal(tag.value, expectedTag.value) { t.Errorf("At position %d, tag %d: expected value %s, got %s", i, j, string(expectedTag.value), string(tag.value)) } if tag.valueType != expectedTag.valueType { @@ -1393,7 +1394,7 @@ func TestQueryResult_Pull_ContextCancellation(t *testing.T) { mockProtector := &test.MockMemoryProtector{ExpectQuotaExceeded: false} - // Create cancelled context + // Create canceled context ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately @@ -1426,12 +1427,12 @@ func TestQueryResult_Pull_ContextCancellation(t *testing.T) { response := qr.Pull() - // With cancelled context, we should handle gracefully + // With canceled context, we should handle gracefully // The behavior depends on implementation - it might return nil or an error response if response != nil { - t.Logf("Response with cancelled context: error=%v, len=%d", response.Error, response.Len()) + t.Logf("Response with canceled context: error=%v, len=%d", response.Error, response.Len()) } else { - t.Log("Got nil response with cancelled context - this is acceptable") + t.Log("Got nil response with canceled context - this is acceptable") } qr.Release()
