This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9289b47c7dec3b548481a6289f09eca37466c76e
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Aug 30 16:44:51 2025 +0800

    Enhance sidx query process by integrating query result handling directly 
into the workflow, further simplifying the codebase and improving performance. 
This update removes the QueryResult interface and refines the queryResult 
struct for better maintainability.
---
 banyand/internal/sidx/multi_sidx_query.go      | 302 ++++++++++++++
 banyand/internal/sidx/multi_sidx_query_test.go | 525 +++++++++++++++++++++++++
 2 files changed, 827 insertions(+)

diff --git a/banyand/internal/sidx/multi_sidx_query.go 
b/banyand/internal/sidx/multi_sidx_query.go
new file mode 100644
index 00000000..44873859
--- /dev/null
+++ b/banyand/internal/sidx/multi_sidx_query.go
@@ -0,0 +1,302 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+)
+
+// QueryMultipleSIDX executes the same query across multiple SIDX instances in 
parallel.
+// and merges their results maintaining the specified ordering.
+// This function is useful for distributed scenarios where data is partitioned 
across
+// multiple SIDX instances and a unified view is needed.
+//
+// Parameters:
+// - ctx: Context for cancellation and timeout control
+// - sidxs: Slice of SIDX instances to query
+// - req: QueryRequest to execute on all SIDX instances
+//
+// Returns:
+// - *QueryResponse: Merged response from all SIDX instances with maintained 
ordering
+// - error: Aggregated errors from failed SIDX queries, or nil if at least one 
succeeds
+//
+// Behavior:
+// - Queries are executed in parallel across all SIDX instances
+// - Results are merged maintaining ASC/DESC order as specified in QueryRequest
+// - MaxElementSize limits are respected across the merged result
+// - Partial failures are tolerated - returns success if at least one SIDX 
succeeds
+// - All errors are aggregated and returned if no SIDX instances succeed
+func QueryMultipleSIDX(ctx context.Context, sidxs []SIDX, req QueryRequest) 
(*QueryResponse, error) {
+       if len(sidxs) == 0 {
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }, nil
+       }
+
+       // Validate the request once before distributing to all SIDX instances
+       if err := req.Validate(); err != nil {
+               return nil, fmt.Errorf("invalid query request: %w", err)
+       }
+
+       // Single SIDX optimization
+       if len(sidxs) == 1 {
+               return sidxs[0].Query(ctx, req)
+       }
+
+       // Parallel execution setup
+       type sidxResult struct {
+               response *QueryResponse
+               err      error
+               index    int
+       }
+
+       resultCh := make(chan sidxResult, len(sidxs))
+       var wg sync.WaitGroup
+
+       // Launch queries in parallel
+       for i, sidxInstance := range sidxs {
+               wg.Add(1)
+               go func(idx int, sidx SIDX) {
+                       defer wg.Done()
+                       resp, err := sidx.Query(ctx, req)
+                       resultCh <- sidxResult{
+                               response: resp,
+                               err:      err,
+                               index:    idx,
+                       }
+               }(i, sidxInstance)
+       }
+
+       // Wait for all queries to complete
+       go func() {
+               wg.Wait()
+               close(resultCh)
+       }()
+
+       // Collect results
+       var successfulResponses []*QueryResponse
+       var aggregatedError error
+
+       for result := range resultCh {
+               if result.err != nil {
+                       aggregatedError = multierr.Append(aggregatedError,
+                               fmt.Errorf("SIDX[%d] query failed: %w", 
result.index, result.err))
+               } else if result.response != nil {
+                       successfulResponses = append(successfulResponses, 
result.response)
+               }
+       }
+
+       // Return error if no successful responses
+       if len(successfulResponses) == 0 {
+               if aggregatedError != nil {
+                       return nil, fmt.Errorf("all SIDX queries failed: %w", 
aggregatedError)
+               }
+               // All succeeded but returned nil responses (shouldn't happen 
but handle gracefully)
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }, nil
+       }
+
+       // Single successful response optimization
+       if len(successfulResponses) == 1 {
+               response := successfulResponses[0]
+               // Include partial failure information if there were errors
+               if aggregatedError != nil {
+                       response.Error = aggregatedError
+               }
+               return response, nil
+       }
+
+       // Merge multiple responses
+       mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req)
+       
+       // Include partial failure information in the response if there were 
errors
+       if aggregatedError != nil {
+               mergedResponse.Error = aggregatedError
+       }
+
+       return mergedResponse, nil
+}
+
+// mergeMultipleSIDXResponses merges multiple QueryResponse instances 
maintaining order.
+// This function leverages the existing merge logic but adapts it for 
multi-SIDX usage.
+func mergeMultipleSIDXResponses(responses []*QueryResponse, req QueryRequest) 
*QueryResponse {
+       if len(responses) == 0 {
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }
+       }
+
+       if len(responses) == 1 {
+               return responses[0]
+       }
+
+       // Determine ordering from request
+       asc := true // Default ascending
+       if req.Order != nil {
+               asc = extractOrdering(req)
+       }
+
+       // Use existing merge functions
+       if asc {
+               return mergeQueryResponseShardsAsc(responses, 
req.MaxElementSize)
+       }
+       return mergeQueryResponseShardsDesc(responses, req.MaxElementSize)
+}
+
+// QueryMultipleSIDXWithOptions provides additional configuration options for 
multi-SIDX queries.
+// This is an extended version of QueryMultipleSIDX with more control over 
execution behavior.
+type MultiSIDXQueryOptions struct {
+       // FailFast determines whether to return immediately on first error or 
collect all results
+       FailFast bool
+       
+       // MinSuccessCount specifies minimum number of successful SIDX queries 
required
+       // If less than MinSuccessCount succeed, the function returns an error
+       MinSuccessCount int
+}
+
+// QueryMultipleSIDXWithOptions executes queries with additional configuration 
options.
+func QueryMultipleSIDXWithOptions(ctx context.Context, sidxs []SIDX, req 
QueryRequest, opts MultiSIDXQueryOptions) (*QueryResponse, error) {
+       if opts.MinSuccessCount <= 0 {
+               opts.MinSuccessCount = 1 // Default: at least one success 
required
+       }
+
+       if len(sidxs) == 0 {
+               if opts.MinSuccessCount > 0 {
+                       return nil, fmt.Errorf("no SIDX instances provided, 
cannot meet minimum success count of %d", opts.MinSuccessCount)
+               }
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }, nil
+       }
+
+       // Validate the request once before distributing
+       if err := req.Validate(); err != nil {
+               return nil, fmt.Errorf("invalid query request: %w", err)
+       }
+
+       // Use regular function for simple cases
+       if !opts.FailFast && opts.MinSuccessCount == 1 {
+               return QueryMultipleSIDX(ctx, sidxs, req)
+       }
+
+       // Extended implementation with options
+       type sidxResult struct {
+               response *QueryResponse
+               err      error
+               index    int
+       }
+
+       resultCh := make(chan sidxResult, len(sidxs))
+       var wg sync.WaitGroup
+
+       // Context for early termination if FailFast is enabled
+       queryCtx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       // Launch queries in parallel
+       for i, sidxInstance := range sidxs {
+               wg.Add(1)
+               go func(idx int, sidx SIDX) {
+                       defer wg.Done()
+                       resp, err := sidx.Query(queryCtx, req)
+                       
+                       select {
+                       case resultCh <- sidxResult{
+                               response: resp,
+                               err:      err,
+                               index:    idx,
+                       }:
+                       case <-queryCtx.Done():
+                               // Context canceled, don't send result
+                       }
+               }(i, sidxInstance)
+       }
+
+       // Wait for all queries to complete
+       go func() {
+               wg.Wait()
+               close(resultCh)
+       }()
+
+       // Collect results with options
+       var successfulResponses []*QueryResponse
+       var aggregatedError error
+       successCount := 0
+       failureCount := 0
+
+       for result := range resultCh {
+               if result.err != nil {
+                       failureCount++
+                       aggregatedError = multierr.Append(aggregatedError,
+                               fmt.Errorf("SIDX[%d] query failed: %w", 
result.index, result.err))
+                       
+                       // Fail fast on first error if enabled
+                       if opts.FailFast {
+                               cancel() // Cancel remaining queries
+                               return nil, fmt.Errorf("query failed fast at 
SIDX[%d]: %w", result.index, result.err)
+                       }
+               } else if result.response != nil {
+                       successCount++
+                       successfulResponses = append(successfulResponses, 
result.response)
+               }
+       }
+
+       // Check minimum success count
+       if successCount < opts.MinSuccessCount {
+               return nil, fmt.Errorf("insufficient successful queries: got 
%d, required %d (failures: %d): %w",
+                       successCount, opts.MinSuccessCount, failureCount, 
aggregatedError)
+       }
+
+       // Return merged results
+       if len(successfulResponses) == 0 {
+               return &QueryResponse{
+                       Keys: make([]int64, 0),
+                       Data: make([][]byte, 0),
+                       Tags: make([][]Tag, 0),
+                       SIDs: make([]common.SeriesID, 0),
+               }, nil
+       }
+
+       mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req)
+       
+       // Include partial failure information if there were errors
+       if aggregatedError != nil && !opts.FailFast {
+               mergedResponse.Error = aggregatedError
+       }
+
+       return mergedResponse, nil
+}
\ No newline at end of file
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
new file mode 100644
index 00000000..a8f0aca4
--- /dev/null
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -0,0 +1,525 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// mockSIDX is a test implementation of SIDX interface for testing multi-SIDX 
queries.
+type mockSIDX struct {
+       err      error
+       response *QueryResponse
+       name     string
+       delay    bool
+}
+
+func (m *mockSIDX) Write(ctx context.Context, reqs []WriteRequest) error {
+       return nil // Not implemented for tests
+}
+
+func (m *mockSIDX) Query(ctx context.Context, req QueryRequest) 
(*QueryResponse, error) {
+       if m.err != nil {
+               return nil, m.err
+       }
+       return m.response, nil
+}
+
+func (m *mockSIDX) Stats(ctx context.Context) (*Stats, error) {
+       return &Stats{}, nil
+}
+
+func (m *mockSIDX) Close() error {
+       return nil
+}
+
+func (m *mockSIDX) Flush() error {
+       return nil
+}
+
+func (m *mockSIDX) Merge() error {
+       return nil
+}
+
+// Helper function to create mock QueryResponse with test data.
+func createMockQueryResponse(keyStart int64, count int, seriesIDBase 
common.SeriesID) *QueryResponse {
+       keys := make([]int64, count)
+       data := make([][]byte, count)
+       tags := make([][]Tag, count)
+       sids := make([]common.SeriesID, count)
+
+       for i := 0; i < count; i++ {
+               keys[i] = keyStart + int64(i)
+               data[i] = []byte("data_" + string(rune('A'+i)))
+               tags[i] = []Tag{
+                       {
+                               Name:      "tag1",
+                               Value:     []byte("value_" + 
string(rune('A'+i))),
+                               ValueType: pbv1.ValueTypeStr,
+                       },
+               }
+               sids[i] = seriesIDBase + common.SeriesID(i)
+       }
+
+       return &QueryResponse{
+               Keys: keys,
+               Data: data,
+               Tags: tags,
+               SIDs: sids,
+       }
+}
+
+func TestQueryMultipleSIDX_EmptyInput(t *testing.T) {
+       ctx := context.Background()
+       req := QueryRequest{}
+
+       resp, err := QueryMultipleSIDX(ctx, nil, req)
+       require.NoError(t, err)
+       assert.Empty(t, resp.Keys)
+       assert.Empty(t, resp.Data)
+       assert.Empty(t, resp.Tags)
+       assert.Empty(t, resp.SIDs)
+
+       resp, err = QueryMultipleSIDX(ctx, []SIDX{}, req)
+       require.NoError(t, err)
+       assert.Empty(t, resp.Keys)
+       assert.Empty(t, resp.Data)
+       assert.Empty(t, resp.Tags)
+       assert.Empty(t, resp.SIDs)
+}
+
+func TestQueryMultipleSIDX_InvalidRequest(t *testing.T) {
+       ctx := context.Background()
+       sidxs := []SIDX{&mockSIDX{name: "sidx1"}}
+
+       // Create invalid request (empty SeriesIDs)
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{}, // Invalid: empty series IDs
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       _, err := QueryMultipleSIDX(ctx, sidxs, req)
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "invalid query request")
+}
+
+func TestQueryMultipleSIDX_SingleSIDX(t *testing.T) {
+       ctx := context.Background()
+
+       expectedResponse := createMockQueryResponse(1, 3, 100)
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: expectedResponse,
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       resp, err := QueryMultipleSIDX(ctx, []SIDX{sidx1}, req)
+       require.NoError(t, err)
+       assert.Equal(t, expectedResponse, resp)
+}
+
+func TestQueryMultipleSIDX_MultipleSIDXSuccess(t *testing.T) {
+       ctx := context.Background()
+
+       // Create two SIDX instances with different data ranges
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 3, 100), // Keys: 1,2,3
+       }
+       sidx2 := &mockSIDX{
+               name:     "sidx2",
+               response: createMockQueryResponse(4, 2, 200), // Keys: 4,5
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102, 200, 201},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+               // Order: nil defaults to ascending
+       }
+
+       resp, err := QueryMultipleSIDX(ctx, []SIDX{sidx1, sidx2}, req)
+       require.NoError(t, err)
+       require.NotNil(t, resp)
+
+       // Verify merged results are properly ordered (ascending)
+       expectedKeys := []int64{1, 2, 3, 4, 5}
+       assert.Equal(t, expectedKeys, resp.Keys)
+       assert.Len(t, resp.Data, 5)
+       assert.Len(t, resp.Tags, 5)
+       assert.Len(t, resp.SIDs, 5)
+
+       // Verify ordering is maintained
+       for i := 1; i < len(resp.Keys); i++ {
+               assert.True(t, resp.Keys[i] > resp.Keys[i-1], "Keys should be 
in ascending order")
+       }
+}
+
+func TestQueryMultipleSIDX_DescendingOrder(t *testing.T) {
+       // Create responses in descending order within each SIDX
+       resp1 := &QueryResponse{
+               Keys: []int64{3, 2, 1}, // Already in descending order
+               Data: [][]byte{[]byte("data3"), []byte("data2"), 
[]byte("data1")},
+               Tags: [][]Tag{{}, {}, {}},
+               SIDs: []common.SeriesID{100, 101, 102},
+       }
+       resp2 := &QueryResponse{
+               Keys: []int64{5, 4}, // Already in descending order
+               Data: [][]byte{[]byte("data5"), []byte("data4")},
+               Tags: [][]Tag{{}, {}},
+               SIDs: []common.SeriesID{200, 201},
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102, 200, 201},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       // Test merging logic directly since we can't easily test order via 
SIDX interface
+       merged := mergeMultipleSIDXResponses([]*QueryResponse{resp1, resp2}, 
req)
+       require.NotNil(t, merged)
+
+       // When merging descending order responses, the result should maintain 
order
+       // This tests the merge function capabilities
+       assert.Len(t, merged.Keys, 5)
+}
+
+func TestQueryMultipleSIDX_MaxElementSizeLimit(t *testing.T) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 5, 100), // 5 elements
+       }
+       sidx2 := &mockSIDX{
+               name:     "sidx2",
+               response: createMockQueryResponse(6, 5, 200), // 5 elements
+       }
+
+       req := QueryRequest{
+               SeriesIDs:      []common.SeriesID{100, 101, 102, 103, 104, 200, 
201, 202, 203, 204},
+               MinKey:         int64Ptr(1),
+               MaxKey:         int64Ptr(15),
+               MaxElementSize: 3, // Limit to 3 elements
+       }
+
+       resp, err := QueryMultipleSIDX(ctx, []SIDX{sidx1, sidx2}, req)
+       require.NoError(t, err)
+       require.NotNil(t, resp)
+
+       // Should be limited to 3 elements
+       assert.Len(t, resp.Keys, 3)
+       assert.Len(t, resp.Data, 3)
+       assert.Len(t, resp.Tags, 3)
+       assert.Len(t, resp.SIDs, 3)
+
+       // Should be the first 3 in order (1,2,3)
+       expectedKeys := []int64{1, 2, 3}
+       assert.Equal(t, expectedKeys, resp.Keys)
+}
+
+func TestQueryMultipleSIDX_PartialFailures(t *testing.T) {
+       ctx := context.Background()
+
+       // One successful SIDX, one failing SIDX
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 3, 100),
+       }
+       sidx2 := &mockSIDX{
+               name: "sidx2",
+               err:  errors.New("query failed"),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       resp, err := QueryMultipleSIDX(ctx, []SIDX{sidx1, sidx2}, req)
+       require.NoError(t, err) // Should succeed with partial results
+       require.NotNil(t, resp)
+
+       // Should have results from successful SIDX
+       assert.Len(t, resp.Keys, 3)
+       assert.Equal(t, []int64{1, 2, 3}, resp.Keys)
+
+       // Should include error information
+       assert.NotNil(t, resp.Error)
+       assert.Contains(t, resp.Error.Error(), "SIDX[1] query failed")
+       assert.Contains(t, resp.Error.Error(), "query failed")
+}
+
+func TestQueryMultipleSIDX_AllFailures(t *testing.T) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name: "sidx1",
+               err:  errors.New("connection failed"),
+       }
+       sidx2 := &mockSIDX{
+               name: "sidx2",
+               err:  errors.New("timeout occurred"),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       resp, err := QueryMultipleSIDX(ctx, []SIDX{sidx1, sidx2}, req)
+       require.Error(t, err)
+       assert.Nil(t, resp)
+
+       assert.Contains(t, err.Error(), "all SIDX queries failed")
+       assert.Contains(t, err.Error(), "connection failed")
+       assert.Contains(t, err.Error(), "timeout occurred")
+}
+
+func TestQueryMultipleSIDX_ContextCancellation(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       // Mock SIDX that checks for context cancellation
+       sidx1 := &mockSIDX{name: "sidx1"}
+       sidx1.response = createMockQueryResponse(1, 3, 100)
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       // Cancel context immediately
+       cancel()
+
+       // Query should respect context cancellation
+       _, err := QueryMultipleSIDX(ctx, []SIDX{sidx1}, req)
+
+       // May succeed if mock doesn't check context, but real implementation 
would respect cancellation
+       // This test mainly verifies the context is properly passed through
+       if err != nil {
+               assert.Contains(t, err.Error(), "context")
+       }
+}
+
+func TestQueryMultipleSIDXWithOptions_FailFast(t *testing.T) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 3, 100),
+       }
+       sidx2 := &mockSIDX{
+               name: "sidx2",
+               err:  errors.New("immediate failure"),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       opts := MultiSIDXQueryOptions{
+               FailFast:        true,
+               MinSuccessCount: 1,
+       }
+
+       _, err := QueryMultipleSIDXWithOptions(ctx, []SIDX{sidx1, sidx2}, req, 
opts)
+       // With FailFast, should return error on first failure
+       if err != nil {
+               assert.Contains(t, err.Error(), "query failed fast")
+       }
+}
+
+func TestQueryMultipleSIDXWithOptions_MinSuccessCount(t *testing.T) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 3, 100),
+       }
+       sidx2 := &mockSIDX{
+               name: "sidx2",
+               err:  errors.New("failure"),
+       }
+       sidx3 := &mockSIDX{
+               name: "sidx3",
+               err:  errors.New("another failure"),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       opts := MultiSIDXQueryOptions{
+               FailFast:        false,
+               MinSuccessCount: 2, // Require at least 2 successes
+       }
+
+       _, err := QueryMultipleSIDXWithOptions(ctx, []SIDX{sidx1, sidx2, 
sidx3}, req, opts)
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "insufficient successful queries: got 
1, required 2")
+}
+
+func TestQueryMultipleSIDXWithOptions_Success(t *testing.T) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 3, 100),
+       }
+       sidx2 := &mockSIDX{
+               name:     "sidx2",
+               response: createMockQueryResponse(4, 2, 200),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: []common.SeriesID{100, 101, 102, 200, 201},
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(10),
+       }
+
+       opts := MultiSIDXQueryOptions{
+               FailFast:        false,
+               MinSuccessCount: 2,
+       }
+
+       resp, err := QueryMultipleSIDXWithOptions(ctx, []SIDX{sidx1, sidx2}, 
req, opts)
+       require.NoError(t, err)
+       require.NotNil(t, resp)
+
+       assert.Len(t, resp.Keys, 5)
+       expectedKeys := []int64{1, 2, 3, 4, 5}
+       assert.Equal(t, expectedKeys, resp.Keys)
+}
+
+func TestMergeMultipleSIDXResponses(t *testing.T) {
+       // Test the internal merge function directly
+       response1 := createMockQueryResponse(1, 3, 100) // Keys: 1,2,3
+       response2 := createMockQueryResponse(4, 2, 200) // Keys: 4,5
+
+       req := QueryRequest{
+               // Order: nil defaults to ascending
+       }
+
+       merged := mergeMultipleSIDXResponses([]*QueryResponse{response1, 
response2}, req)
+       require.NotNil(t, merged)
+
+       expectedKeys := []int64{1, 2, 3, 4, 5}
+       assert.Equal(t, expectedKeys, merged.Keys)
+       assert.Len(t, merged.Data, 5)
+       assert.Len(t, merged.Tags, 5)
+       assert.Len(t, merged.SIDs, 5)
+}
+
+func TestMergeMultipleSIDXResponses_OverlappingKeys(t *testing.T) {
+       // Test with overlapping key ranges to ensure proper merging
+       response1 := &QueryResponse{
+               Keys: []int64{1, 3, 5},
+               Data: [][]byte{[]byte("data1"), []byte("data3"), 
[]byte("data5")},
+               Tags: [][]Tag{{}, {}, {}},
+               SIDs: []common.SeriesID{100, 101, 102},
+       }
+
+       response2 := &QueryResponse{
+               Keys: []int64{2, 4, 6},
+               Data: [][]byte{[]byte("data2"), []byte("data4"), 
[]byte("data6")},
+               Tags: [][]Tag{{}, {}, {}},
+               SIDs: []common.SeriesID{200, 201, 202},
+       }
+
+       req := QueryRequest{
+               // Order: nil defaults to ascending
+       }
+
+       merged := mergeMultipleSIDXResponses([]*QueryResponse{response1, 
response2}, req)
+       require.NotNil(t, merged)
+
+       // Should be perfectly interleaved: 1,2,3,4,5,6
+       expectedKeys := []int64{1, 2, 3, 4, 5, 6}
+       assert.Equal(t, expectedKeys, merged.Keys)
+
+       // Verify data integrity is maintained
+       for i, key := range merged.Keys {
+               expectedData := []byte("data" + string(rune('0'+key)))
+               assert.Equal(t, expectedData, merged.Data[i])
+       }
+}
+
+// Benchmark tests for performance validation.
+
+func BenchmarkQueryMultipleSIDX_TwoInstances(b *testing.B) {
+       ctx := context.Background()
+
+       sidx1 := &mockSIDX{
+               name:     "sidx1",
+               response: createMockQueryResponse(1, 100, 100),
+       }
+       sidx2 := &mockSIDX{
+               name:     "sidx2",
+               response: createMockQueryResponse(101, 100, 200),
+       }
+
+       req := QueryRequest{
+               SeriesIDs: generateSeriesIDs(200),
+               MinKey:    int64Ptr(1),
+               MaxKey:    int64Ptr(300),
+       }
+
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               _, err := QueryMultipleSIDX(ctx, []SIDX{sidx1, sidx2}, req)
+               if err != nil {
+                       b.Fatalf("Unexpected error: %v", err)
+               }
+       }
+}
+
+// Helper functions.
+
+func int64Ptr(v int64) *int64 {
+       return &v
+}
+
+func generateSeriesIDs(count int) []common.SeriesID {
+       ids := make([]common.SeriesID, count)
+       for i := 0; i < count; i++ {
+               ids[i] = common.SeriesID(100 + i)
+       }
+       return ids
+}

Reply via email to