This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3183f3490ea591e80c97f5e1b127d3fc9c6bf1db Author: Gao Hongtao <[email protected]> AuthorDate: Sat Aug 30 17:28:43 2025 +0800 Refactor sidx handling in trace storage to support multiple instances. Updated the tsTable structure to manage a map of sidx instances, enabling dynamic creation and retrieval. Enhanced flushing and merging processes to iterate over all sidx instances, improving error handling and logging. Additionally, modified trace preparation to accommodate multiple sidx write requests. --- banyand/internal/sidx/multi_sidx_query.go | 16 ++--- banyand/internal/sidx/multi_sidx_query_test.go | 7 +-- banyand/trace/flusher.go | 8 ++- banyand/trace/merger.go | 8 ++- banyand/trace/traces.go | 12 ++-- banyand/trace/tstable.go | 81 ++++++++++++++++++++------ banyand/trace/write_standalone.go | 30 +++++++--- 7 files changed, 112 insertions(+), 50 deletions(-) diff --git a/banyand/internal/sidx/multi_sidx_query.go b/banyand/internal/sidx/multi_sidx_query.go index 44873859..3424792b 100644 --- a/banyand/internal/sidx/multi_sidx_query.go +++ b/banyand/internal/sidx/multi_sidx_query.go @@ -46,7 +46,7 @@ import ( // - Results are merged maintaining ASC/DESC order as specified in QueryRequest // - MaxElementSize limits are respected across the merged result // - Partial failures are tolerated - returns success if at least one SIDX succeeds -// - All errors are aggregated and returned if no SIDX instances succeed +// - All errors are aggregated and returned if no SIDX instances succeed. func QueryMultipleSIDX(ctx context.Context, sidxs []SIDX, req QueryRequest) (*QueryResponse, error) { if len(sidxs) == 0 { return &QueryResponse{ @@ -136,7 +136,7 @@ func QueryMultipleSIDX(ctx context.Context, sidxs []SIDX, req QueryRequest) (*Qu // Merge multiple responses mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req) - + // Include partial failure information in the response if there were errors if aggregatedError != nil { mergedResponse.Error = aggregatedError @@ -174,12 +174,12 @@ func mergeMultipleSIDXResponses(responses []*QueryResponse, req QueryRequest) *Q return mergeQueryResponseShardsDesc(responses, req.MaxElementSize) } -// QueryMultipleSIDXWithOptions provides additional configuration options for multi-SIDX queries. +// MultiSIDXQueryOptions provides additional configuration options for multi-SIDX queries. // This is an extended version of QueryMultipleSIDX with more control over execution behavior. type MultiSIDXQueryOptions struct { // FailFast determines whether to return immediately on first error or collect all results FailFast bool - + // MinSuccessCount specifies minimum number of successful SIDX queries required // If less than MinSuccessCount succeed, the function returns an error MinSuccessCount int @@ -233,7 +233,7 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, sidxs []SIDX, req QueryRe go func(idx int, sidx SIDX) { defer wg.Done() resp, err := sidx.Query(queryCtx, req) - + select { case resultCh <- sidxResult{ response: resp, @@ -263,7 +263,7 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, sidxs []SIDX, req QueryRe failureCount++ aggregatedError = multierr.Append(aggregatedError, fmt.Errorf("SIDX[%d] query failed: %w", result.index, result.err)) - + // Fail fast on first error if enabled if opts.FailFast { cancel() // Cancel remaining queries @@ -292,11 +292,11 @@ func QueryMultipleSIDXWithOptions(ctx context.Context, sidxs []SIDX, req QueryRe } mergedResponse := mergeMultipleSIDXResponses(successfulResponses, req) - + // Include partial failure information if there were errors if aggregatedError != nil && !opts.FailFast { mergedResponse.Error = aggregatedError } return mergedResponse, nil -} \ No newline at end of file +} diff --git a/banyand/internal/sidx/multi_sidx_query_test.go b/banyand/internal/sidx/multi_sidx_query_test.go index a8f0aca4..93cafca6 100644 --- a/banyand/internal/sidx/multi_sidx_query_test.go +++ b/banyand/internal/sidx/multi_sidx_query_test.go @@ -37,18 +37,18 @@ type mockSIDX struct { delay bool } -func (m *mockSIDX) Write(ctx context.Context, reqs []WriteRequest) error { +func (m *mockSIDX) Write(_ context.Context, _ []WriteRequest) error { return nil // Not implemented for tests } -func (m *mockSIDX) Query(ctx context.Context, req QueryRequest) (*QueryResponse, error) { +func (m *mockSIDX) Query(_ context.Context, _ QueryRequest) (*QueryResponse, error) { if m.err != nil { return nil, m.err } return m.response, nil } -func (m *mockSIDX) Stats(ctx context.Context) (*Stats, error) { +func (m *mockSIDX) Stats(_ context.Context) (*Stats, error) { return &Stats{}, nil } @@ -326,7 +326,6 @@ func TestQueryMultipleSIDX_ContextCancellation(t *testing.T) { // Query should respect context cancellation _, err := QueryMultipleSIDX(ctx, []SIDX{sidx1}, req) - // May succeed if mock doesn't check context, but real implementation would respect cancellation // This test mainly verifies the context is properly passed through if err != nil { diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go index 3798e76c..3e4c2d5a 100644 --- a/banyand/trace/flusher.go +++ b/banyand/trace/flusher.go @@ -203,9 +203,11 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) if len(ind.flushed) < 1 { return } - if err := tst.sidx.Flush(); err != nil { - tst.l.Warn().Err(err).Msg("sidx flush failed") - return + for sidxName, sidxInstance := range tst.sidxMap { + if err := sidxInstance.Flush(); err != nil { + tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx flush failed") + return + } } end := time.Now() tst.incTotalFlushed(1) diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index 369a0f2f..d391f9bd 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -111,9 +111,11 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part if err != nil { return nil, err } - if err := tst.sidx.Merge(); err != nil { - tst.l.Warn().Err(err).Msg("sidx merge failed") - return nil, err + for sidxName, sidxInstance := range tst.sidxMap { + if err := sidxInstance.Merge(); err != nil { + tst.l.Warn().Err(err).Str("sidx", sidxName).Msg("sidx merge failed") + return nil, err + } } elapsed := time.Since(start) tst.incTotalMergeLatency(elapsed.Seconds(), typ) diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go index 4f6599f2..b812df08 100644 --- a/banyand/trace/traces.go +++ b/banyand/trace/traces.go @@ -196,12 +196,12 @@ func releaseTraces(t *traces) { var tracesPool = pool.Register[*traces]("trace-traces") type tracesInTable struct { - segment storage.Segment[*tsTable, option] - tsTable *tsTable - traces *traces - timeRange timestamp.TimeRange - sidxReqs []sidx.WriteRequest - shardID common.ShardID + segment storage.Segment[*tsTable, option] + tsTable *tsTable + traces *traces + sidxReqsMap map[string][]sidx.WriteRequest + timeRange timestamp.TimeRange + shardID common.ShardID } type tracesInGroup struct { diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go index 98b67237..91a90cc1 100644 --- a/banyand/trace/tstable.go +++ b/banyand/trace/tstable.go @@ -48,7 +48,7 @@ const ( type tsTable struct { l *logger.Logger fileSystem fs.FileSystem - sidx sidx.SIDX + sidxMap map[string]sidx.SIDX pm protector.Memory metrics *metrics snapshot *snapshot @@ -201,15 +201,7 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, if m != nil { tst.metrics = m.(*metrics) } - sidxOpts, err := sidx.NewOptions(filepath.Join(rootPath, "sidx"), option.protector) - if err != nil { - logger.Panicf("cannot create sidx options: %s", err) - } - sidx, err := sidx.NewSIDX(sidxOpts) - if err != nil { - logger.Panicf("cannot create sidx: %s", err) - } - tst.sidx = sidx + tst.sidxMap = make(map[string]sidx.SIDX) tst.gc.init(&tst) ee := fileSystem.ReadDir(rootPath) if len(ee) == 0 { @@ -269,6 +261,65 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, return t, nil } +func (tst *tsTable) getSidx(name string) (sidx.SIDX, bool) { + tst.RLock() + defer tst.RUnlock() + sidxInstance, ok := tst.sidxMap[name] + return sidxInstance, ok +} + +func (tst *tsTable) getOrCreateSidx(name string) (sidx.SIDX, error) { + if sidxInstance, ok := tst.getSidx(name); ok { + return sidxInstance, nil + } + + tst.Lock() + defer tst.Unlock() + + // Double-check after acquiring write lock + if sidxInstance, ok := tst.sidxMap[name]; ok { + return sidxInstance, nil + } + + // Create new sidx instance + sidxPath := filepath.Join(tst.root, "sidx", name) + sidxOpts, err := sidx.NewOptions(sidxPath, tst.option.protector) + if err != nil { + return nil, fmt.Errorf("cannot create sidx options for %s: %w", name, err) + } + + newSidx, err := sidx.NewSIDX(sidxOpts) + if err != nil { + return nil, fmt.Errorf("cannot create sidx for %s: %w", name, err) + } + + tst.sidxMap[name] = newSidx + return newSidx, nil +} + +// getAllSidx returns all sidx instances for potential multi-sidx queries. +func (tst *tsTable) getAllSidx() []sidx.SIDX { + tst.RLock() + defer tst.RUnlock() + + result := make([]sidx.SIDX, 0, len(tst.sidxMap)) + for _, sidxInstance := range tst.sidxMap { + result = append(result, sidxInstance) + } + return result +} + +func (tst *tsTable) closeSidxMap() error { + var lastErr error + for name, sidxInstance := range tst.sidxMap { + if err := sidxInstance.Close(); err != nil { + tst.l.Error().Err(err).Str("sidx", name).Msg("failed to close sidx") + lastErr = err + } + } + return lastErr +} + func (tst *tsTable) Close() error { if tst.loopCloser != nil { tst.loopCloser.Done() @@ -278,17 +329,11 @@ func (tst *tsTable) Close() error { defer tst.Unlock() tst.deleteMetrics() if tst.snapshot == nil { - if tst.sidx != nil { - return tst.sidx.Close() - } - return nil + return tst.closeSidxMap() } tst.snapshot.decRef() tst.snapshot = nil - if tst.sidx != nil { - return tst.sidx.Close() - } - return nil + return tst.closeSidxMap() } func (tst *tsTable) mustAddMemPart(mp *memPart) { diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index 0bf231e9..a4f3c275 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -153,10 +153,11 @@ func (w *writeCallback) prepareTracesInTable(eg *tracesInGroup, writeEvent *trac } et = &tracesInTable{ - timeRange: segment.GetTimeRange(), - tsTable: tstb, - traces: generateTraces(), - segment: segment, + timeRange: segment.GetTimeRange(), + tsTable: tstb, + traces: generateTraces(), + segment: segment, + sidxReqsMap: make(map[string][]sidx.WriteRequest), } et.traces.reset() eg.tables = append(eg.tables, et) @@ -263,7 +264,13 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, writeEv SeriesID: series.ID, Key: key, } - tracesInTable.sidxReqs = append(tracesInTable.sidxReqs, writeReq) + + sidxName := indexRule.GetMetadata().GetName() + + if tracesInTable.sidxReqsMap[sidxName] == nil { + tracesInTable.sidxReqsMap[sidxName] = make([]sidx.WriteRequest, 0) + } + tracesInTable.sidxReqsMap[sidxName] = append(tracesInTable.sidxReqsMap[sidxName], writeReq) } return nil @@ -307,9 +314,16 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. for j := range g.tables { es := g.tables[j] es.tsTable.mustAddTraces(es.traces) - if len(es.sidxReqs) > 0 { - if err := es.tsTable.sidx.Write(ctx, es.sidxReqs); err != nil { - w.l.Error().Err(err).Msg("cannot write to secondary index") + for sidxName, sidxReqs := range es.sidxReqsMap { + if len(sidxReqs) > 0 { + sidxInstance, err := es.tsTable.getOrCreateSidx(sidxName) + if err != nil { + w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot get or create sidx instance") + continue + } + if err := sidxInstance.Write(ctx, sidxReqs); err != nil { + w.l.Error().Err(err).Str("sidx", sidxName).Msg("cannot write to secondary index") + } } } releaseTraces(es.traces)
