This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3183f3490ea591e80c97f5e1b127d3fc9c6bf1db
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Aug 30 17:28:43 2025 +0800

    Refactor sidx handling in trace storage to support multiple instances. 
Updated the tsTable structure to manage a map of sidx instances, enabling 
dynamic creation and retrieval. Enhanced flushing and merging processes to 
iterate over all sidx instances, improving error handling and logging. 
Additionally, modified trace preparation to accommodate multiple sidx write 
requests.
---
 banyand/internal/sidx/multi_sidx_query.go      | 16 ++---
 banyand/internal/sidx/multi_sidx_query_test.go |  7 +--
 banyand/trace/flusher.go                       |  8 ++-
 banyand/trace/merger.go                        |  8 ++-
 banyand/trace/traces.go                        | 12 ++--
 banyand/trace/tstable.go                       | 81 ++++++++++++++++++++------
 banyand/trace/write_standalone.go              | 30 +++++++---
 7 files changed, 112 insertions(+), 50 deletions(-)

diff --git a/banyand/internal/sidx/multi_sidx_query.go 
b/banyand/internal/sidx/multi_sidx_query.go
index 44873859..3424792b 100644
--- a/banyand/internal/sidx/multi_sidx_query.go
+++ b/banyand/internal/sidx/multi_sidx_query.go
@@ -46,7 +46,7 @@ import (
 // - Results are merged maintaining ASC/DESC order as specified in QueryRequest
 // - MaxElementSize limits are respected across the merged result
 // - Partial failures are tolerated - returns success if at least one SIDX 
succeeds
-// - All errors are aggregated and returned if no SIDX instances succeed
+// - All errors are aggregated and returned if no SIDX instances succeed.
 func QueryMultipleSIDX(ctx context.Context, sidxs []SIDX, req QueryRequest) 
(*QueryResponse, error) {
        if len(sidxs) == 0 {
                return &QueryResponse{
@@ -136,7 +136,7 @@ func QueryMultipleSIDX(ctx context.Context, sidxs []SIDX, 
req QueryRequest) (*Qu
 
        // Merge multiple responses
        mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req)
-       
+
        // Include partial failure information in the response if there were 
errors
        if aggregatedError != nil {
                mergedResponse.Error = aggregatedError
@@ -174,12 +174,12 @@ func mergeMultipleSIDXResponses(responses 
[]*QueryResponse, req QueryRequest) *Q
        return mergeQueryResponseShardsDesc(responses, req.MaxElementSize)
 }
 
-// QueryMultipleSIDXWithOptions provides additional configuration options for 
multi-SIDX queries.
+// MultiSIDXQueryOptions provides additional configuration options for 
multi-SIDX queries.
 // This is an extended version of QueryMultipleSIDX with more control over 
execution behavior.
 type MultiSIDXQueryOptions struct {
        // FailFast determines whether to return immediately on first error or 
collect all results
        FailFast bool
-       
+
        // MinSuccessCount specifies minimum number of successful SIDX queries 
required
        // If less than MinSuccessCount succeed, the function returns an error
        MinSuccessCount int
@@ -233,7 +233,7 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, 
sidxs []SIDX, req QueryRe
                go func(idx int, sidx SIDX) {
                        defer wg.Done()
                        resp, err := sidx.Query(queryCtx, req)
-                       
+
                        select {
                        case resultCh <- sidxResult{
                                response: resp,
@@ -263,7 +263,7 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, 
sidxs []SIDX, req QueryRe
                        failureCount++
                        aggregatedError = multierr.Append(aggregatedError,
                                fmt.Errorf("SIDX[%d] query failed: %w", 
result.index, result.err))
-                       
+
                        // Fail fast on first error if enabled
                        if opts.FailFast {
                                cancel() // Cancel remaining queries
@@ -292,11 +292,11 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, 
sidxs []SIDX, req QueryRe
        }
 
        mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req)
-       
+
        // Include partial failure information if there were errors
        if aggregatedError != nil && !opts.FailFast {
                mergedResponse.Error = aggregatedError
        }
 
        return mergedResponse, nil
-}
\ No newline at end of file
+}
diff --git a/banyand/internal/sidx/multi_sidx_query_test.go 
b/banyand/internal/sidx/multi_sidx_query_test.go
index a8f0aca4..93cafca6 100644
--- a/banyand/internal/sidx/multi_sidx_query_test.go
+++ b/banyand/internal/sidx/multi_sidx_query_test.go
@@ -37,18 +37,18 @@ type mockSIDX struct {
        delay    bool
 }
 
-func (m *mockSIDX) Write(ctx context.Context, reqs []WriteRequest) error {
+func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error {
        return nil // Not implemented for tests
 }
 
-func (m *mockSIDX) Query(ctx context.Context, req QueryRequest) 
(*QueryResponse, error) {
+func (m *mockSIDX) Query(_ context.Context, _ QueryRequest) (*QueryResponse, 
error) {
        if m.err != nil {
                return nil, m.err
        }
        return m.response, nil
 }
 
-func (m *mockSIDX) Stats(ctx context.Context) (*Stats, error) {
+func (m *mockSIDX) Stats(_ context.Context) (*Stats, error) {
        return &Stats{}, nil
 }
 
@@ -326,7 +326,6 @@ func TestQueryMultipleSIDX_ContextCancellation(t 
*testing.T) {
 
        // Query should respect context cancellation
        _, err := QueryMultipleSIDX(ctx, []SIDX{sidx1}, req)
-
        // May succeed if mock doesn't check context, but real implementation 
would respect cancellation
        // This test mainly verifies the context is properly passed through
        if err != nil {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 3798e76c..3e4c2d5a 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -203,9 +203,11 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
        if len(ind.flushed) < 1 {
                return
        }
-       if err := tst.sidx.Flush(); err != nil {
-               tst.l.Warn().Err(err).Msg("sidx flush failed")
-               return
+       for sidxName, sidxInstance := range tst.sidxMap {
+               if err := sidxInstance.Flush(); err != nil {
+                       tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx 
flush failed")
+                       return
+               }
        }
        end := time.Now()
        tst.incTotalFlushed(1)
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 369a0f2f..d391f9bd 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -111,9 +111,11 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator 
snapshotCreator, part
        if err != nil {
                return nil, err
        }
-       if err := tst.sidx.Merge(); err != nil {
-               tst.l.Warn().Err(err).Msg("sidx merge failed")
-               return nil, err
+       for sidxName, sidxInstance := range tst.sidxMap {
+               if err := sidxInstance.Merge(); err != nil {
+                       tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx 
merge failed")
+                       return nil, err
+               }
        }
        elapsed := time.Since(start)
        tst.incTotalMergeLatency(elapsed.Seconds(), typ)
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index 4f6599f2..b812df08 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -196,12 +196,12 @@ func releaseTraces(t *traces) {
 var tracesPool = pool.Register[*traces]("trace-traces")
 
 type tracesInTable struct {
-       segment   storage.Segment[*tsTable, option]
-       tsTable   *tsTable
-       traces    *traces
-       timeRange timestamp.TimeRange
-       sidxReqs  []sidx.WriteRequest
-       shardID   common.ShardID
+       segment     storage.Segment[*tsTable, option]
+       tsTable     *tsTable
+       traces      *traces
+       sidxReqsMap map[string][]sidx.WriteRequest
+       timeRange   timestamp.TimeRange
+       shardID     common.ShardID
 }
 
 type tracesInGroup struct {
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 98b67237..91a90cc1 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -48,7 +48,7 @@ const (
 type tsTable struct {
        l             *logger.Logger
        fileSystem    fs.FileSystem
-       sidx          sidx.SIDX
+       sidxMap       map[string]sidx.SIDX
        pm            protector.Memory
        metrics       *metrics
        snapshot      *snapshot
@@ -201,15 +201,7 @@ func initTSTable(fileSystem fs.FileSystem, rootPath 
string, p common.Position,
        if m != nil {
                tst.metrics = m.(*metrics)
        }
-       sidxOpts, err := sidx.NewOptions(filepath.Join(rootPath, "sidx"), 
option.protector)
-       if err != nil {
-               logger.Panicf("cannot create sidx options: %s", err)
-       }
-       sidx, err := sidx.NewSIDX(sidxOpts)
-       if err != nil {
-               logger.Panicf("cannot create sidx: %s", err)
-       }
-       tst.sidx = sidx
+       tst.sidxMap = make(map[string]sidx.SIDX)
        tst.gc.init(&tst)
        ee := fileSystem.ReadDir(rootPath)
        if len(ee) == 0 {
@@ -269,6 +261,65 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
        return t, nil
 }
 
+func (tst *tsTable) getSidx(name string) (sidx.SIDX, bool) {
+       tst.RLock()
+       defer tst.RUnlock()
+       sidxInstance, ok := tst.sidxMap[name]
+       return sidxInstance, ok
+}
+
+func (tst *tsTable) getOrCreateSidx(name string) (sidx.SIDX, error) {
+       if sidxInstance, ok := tst.getSidx(name); ok {
+               return sidxInstance, nil
+       }
+
+       tst.Lock()
+       defer tst.Unlock()
+
+       // Double-check after acquiring write lock
+       if sidxInstance, ok := tst.sidxMap[name]; ok {
+               return sidxInstance, nil
+       }
+
+       // Create new sidx instance
+       sidxPath := filepath.Join(tst.root, "sidx", name)
+       sidxOpts, err := sidx.NewOptions(sidxPath, tst.option.protector)
+       if err != nil {
+               return nil, fmt.Errorf("cannot create sidx options for %s: %w", 
name, err)
+       }
+
+       newSidx, err := sidx.NewSIDX(sidxOpts)
+       if err != nil {
+               return nil, fmt.Errorf("cannot create sidx for %s: %w", name, 
err)
+       }
+
+       tst.sidxMap[name] = newSidx
+       return newSidx, nil
+}
+
+// getAllSidx returns all sidx instances for potential multi-sidx queries.
+func (tst *tsTable) getAllSidx() []sidx.SIDX {
+       tst.RLock()
+       defer tst.RUnlock()
+
+       result := make([]sidx.SIDX, 0, len(tst.sidxMap))
+       for _, sidxInstance := range tst.sidxMap {
+               result = append(result, sidxInstance)
+       }
+       return result
+}
+
+func (tst *tsTable) closeSidxMap() error {
+       var lastErr error
+       for name, sidxInstance := range tst.sidxMap {
+               if err := sidxInstance.Close(); err != nil {
+                       tst.l.Error().Err(err).Str("sidx", name).Msg("failed to 
close sidx")
+                       lastErr = err
+               }
+       }
+       return lastErr
+}
+
 func (tst *tsTable) Close() error {
        if tst.loopCloser != nil {
                tst.loopCloser.Done()
@@ -278,17 +329,11 @@ func (tst *tsTable) Close() error {
        defer tst.Unlock()
        tst.deleteMetrics()
        if tst.snapshot == nil {
-               if tst.sidx != nil {
-                       return tst.sidx.Close()
-               }
-               return nil
+               return tst.closeSidxMap()
        }
        tst.snapshot.decRef()
        tst.snapshot = nil
-       if tst.sidx != nil {
-               return tst.sidx.Close()
-       }
-       return nil
+       return tst.closeSidxMap()
 }
 
 func (tst *tsTable) mustAddMemPart(mp *memPart) {
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 0bf231e9..a4f3c275 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -153,10 +153,11 @@ func (w *writeCallback) prepareTracesInTable(eg 
*tracesInGroup, writeEvent *trac
                }
 
                et = &tracesInTable{
-                       timeRange: segment.GetTimeRange(),
-                       tsTable:   tstb,
-                       traces:    generateTraces(),
-                       segment:   segment,
+                       timeRange:   segment.GetTimeRange(),
+                       tsTable:     tstb,
+                       traces:      generateTraces(),
+                       segment:     segment,
+                       sidxReqsMap: make(map[string][]sidx.WriteRequest),
                }
                et.traces.reset()
                eg.tables = append(eg.tables, et)
@@ -263,7 +264,13 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable 
*tracesInTable, writeEv
                        SeriesID: series.ID,
                        Key:      key,
                }
-               tracesInTable.sidxReqs = append(tracesInTable.sidxReqs, 
writeReq)
+
+               sidxName := indexRule.GetMetadata().GetName()
+
+               if tracesInTable.sidxReqsMap[sidxName] == nil {
+                       tracesInTable.sidxReqsMap[sidxName] = 
make([]sidx.WriteRequest, 0)
+               }
+               tracesInTable.sidxReqsMap[sidxName] = 
append(tracesInTable.sidxReqsMap[sidxName], writeReq)
        }
 
        return nil
@@ -307,9 +314,16 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                for j := range g.tables {
                        es := g.tables[j]
                        es.tsTable.mustAddTraces(es.traces)
-                       if len(es.sidxReqs) > 0 {
-                               if err := es.tsTable.sidx.Write(ctx, 
es.sidxReqs); err != nil {
-                                       w.l.Error().Err(err).Msg("cannot write 
to secondary index")
+                       for sidxName, sidxReqs := range es.sidxReqsMap {
+                               if len(sidxReqs) > 0 {
+                                       sidxInstance, err := 
es.tsTable.getOrCreateSidx(sidxName)
+                                       if err != nil {
+                                               
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx 
instance")
+                                               continue
+                                       }
+                                       if err := sidxInstance.Write(ctx, 
sidxReqs); err != nil {
+                                               
w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary 
index")
+                                       }
                                }
                        }
                        releaseTraces(es.traces)

Reply via email to