This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit f42535ef6fb7bd41a76b25e00e6cfa190fa4a079 Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 31 07:34:26 2025 +0800 Enhance trace querying by integrating entity handling and series ID mapping. --- banyand/trace/query.go | 36 ++++++++++++++++++++++-- banyand/trace/trace.go | 11 +++++--- pkg/query/logical/trace/trace_plan_local.go | 3 ++ pkg/query/logical/trace/trace_plan_tag_filter.go | 18 ++++++++---- pkg/query/model/model.go | 3 ++ 5 files changed, 59 insertions(+), 12 deletions(-) diff --git a/banyand/trace/query.go b/banyand/trace/query.go index 07b70afd..a0c328fc 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/internal/storage" @@ -40,7 +41,8 @@ const checkDoneEvery = 128 var nilResult = model.TraceQueryResult(nil) type queryOptions struct { - traceIDs []string + seriesToEntity map[common.SeriesID][]*modelv1.TagValue + traceIDs []string model.TraceQueryOptions minTimestamp int64 maxTimestamp int64 @@ -48,6 +50,7 @@ type queryOptions struct { func (qo *queryOptions) reset() { qo.TraceQueryOptions.Reset() + qo.seriesToEntity = nil qo.traceIDs = nil qo.minTimestamp = 0 qo.maxTimestamp = 0 @@ -55,6 +58,7 @@ func (qo *queryOptions) reset() { func (qo *queryOptions) copyFrom(other *queryOptions) { qo.TraceQueryOptions.CopyFrom(&other.TraceQueryOptions) + qo.seriesToEntity = other.seriesToEntity qo.traceIDs = make([]string, len(other.traceIDs)) copy(qo.traceIDs, other.traceIDs) qo.minTimestamp = other.minTimestamp @@ -104,6 +108,30 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T minTimestamp: tqo.TimeRange.Start.UnixNano(), maxTimestamp: tqo.TimeRange.End.UnixNano(), } + + // Process entities to get series IDs for sidx queries + if len(tqo.Entities) > 0 { + // Create series from entities for lookup + series := make([]*pbv1.Series, len(tqo.Entities)) + for i, entityValues := range tqo.Entities { + series[i] = &pbv1.Series{ + Subject: tqo.Name, + EntityValues: entityValues, + } + } + + // Use segment lookup to find actual series that exist in the data + qo.seriesToEntity = make(map[common.SeriesID][]*modelv1.TagValue) + for _, segment := range segments { + sl, lookupErr := segment.Lookup(ctx, series) + if lookupErr != nil { + return nil, fmt.Errorf("cannot lookup series: %w", lookupErr) + } + for _, s := range sl { + qo.seriesToEntity[s.ID] = s.EntityValues + } + } + } var n int tables := make([]*tsTable, 0) for _, segment := range segments { @@ -129,7 +157,11 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T if len(sidxInstances) > 0 { // Query sidx for trace IDs - traceIDs, sidxErr := t.querySidxForTraceIDs(ctx, sidxInstances, tqo) + var seriesIDs []common.SeriesID + for seriesID := range qo.seriesToEntity { + seriesIDs = append(seriesIDs, seriesID) + } + traceIDs, sidxErr := t.querySidxForTraceIDs(ctx, sidxInstances, tqo, seriesIDs) if sidxErr != nil { t.l.Warn().Err(sidxErr).Str("sidx", sidxName).Msg("sidx query failed, falling back to normal query") } else if len(traceIDs) > 0 { diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index 7e7cc594..dfa552d9 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -137,7 +137,7 @@ func (t *trace) parseSpec() { } // querySidxForTraceIDs queries sidx instances to get ordered trace IDs. -func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.SIDX, tqo model.TraceQueryOptions) ([]string, error) { +func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.SIDX, tqo model.TraceQueryOptions, seriesIDs []common.SeriesID) ([]string, error) { // Convert TraceQueryOptions to sidx.QueryRequest req := sidx.QueryRequest{ Filter: tqo.SkippingFilter, @@ -158,9 +158,12 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.S req.MaxKey = &maxKey } - // For now, use all series IDs (this could be optimized further) - // TODO: Consider filtering by relevant series IDs based on query context - req.SeriesIDs = []common.SeriesID{1} // Placeholder - should be dynamically determined + // Use the provided series IDs for targeted querying + if len(seriesIDs) > 0 { + req.SeriesIDs = seriesIDs + } else { + req.SeriesIDs = []common.SeriesID{1} + } // Query multiple sidx instances response, err := sidx.QueryMultipleSIDX(ctx, sidxInstances, req) diff --git a/pkg/query/logical/trace/trace_plan_local.go b/pkg/query/logical/trace/trace_plan_local.go index d73af754..71aebf07 100644 --- a/pkg/query/logical/trace/trace_plan_local.go +++ b/pkg/query/logical/trace/trace_plan_local.go @@ -22,6 +22,7 @@ import ( "fmt" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/iter" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -49,6 +50,7 @@ type localScan struct { projectionTags *model.TagProjection timeRange timestamp.TimeRange projectionTagRefs [][]*logical.TagRef + entities [][]*modelv1.TagValue traceIDs []string maxTraceSize int } @@ -90,6 +92,7 @@ func (i *localScan) Execute(ctx context.Context) (iter.Iterator[model.TraceResul SkippingFilter: i.skippingFilter, Order: orderBy, TagProjection: i.projectionTags, + Entities: i.entities, MaxTraceSize: i.maxTraceSize, TraceIDs: i.traceIDs, }); err != nil { diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index f868d49a..8bb3aeca 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -55,11 +55,13 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er var err error var conditionTagNames []string var traceIDs []string - // For trace, we only use skipping filter (no inverted filter or entities) - ctx.skippingFilter, conditionTagNames, traceIDs, err = buildTraceFilter(uis.criteria, s, entityDict, uis.traceIDTagName) + var entities [][]*modelv1.TagValue + // For trace, we use skipping filter and capture entities for query optimization + ctx.skippingFilter, entities, conditionTagNames, traceIDs, err = buildTraceFilter(uis.criteria, s, entityDict, uis.traceIDTagName) if err != nil { return nil, err } + ctx.entities = entities // Initialize projectionTags even if no explicit projection tags are provided ctx.projectionTags = &model.TagProjection{ @@ -117,6 +119,7 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext projectionTags: ctx.projectionTags, metadata: uis.metadata, skippingFilter: ctx.skippingFilter, + entities: ctx.entities, l: logger.GetLogger("query", "trace", "local-scan"), ec: ec, traceIDs: traceIDs, @@ -128,6 +131,7 @@ type traceAnalyzeContext struct { skippingFilter index.Filter projectionTags *model.TagProjection projTagsRefs [][]*logical.TagRef + entities [][]*modelv1.TagValue } func newTraceAnalyzerContext(s logical.Schema) *traceAnalyzeContext { @@ -151,9 +155,11 @@ func deduplicateStrings(strings []string) []string { // buildTraceFilter builds a filter for trace queries and returns both the filter and collected tag names. // Unlike stream, trace only needs skipping filter. -func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict map[string]int, traceIDTagName string) (index.Filter, []string, []string, error) { +func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict map[string]int, + traceIDTagName string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, error) { if criteria == nil { - return nil, nil, nil, nil + return nil, nil, nil, nil, nil } // Create a map of valid tag names from the schema tagNames := make(map[string]bool) @@ -162,8 +168,8 @@ func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict m tagNames[tagName] = true } - filter, _, collectedTagNames, traceIDs, err := buildFilter(criteria, tagNames, entityDict, nil, traceIDTagName) - return filter, collectedTagNames, traceIDs, err + filter, entities, collectedTagNames, traceIDs, err := buildFilter(criteria, tagNames, entityDict, nil, traceIDTagName) + return filter, entities, collectedTagNames, traceIDs, err } var ( diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index dda510ec..6a504bc1 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -322,6 +322,7 @@ type TraceQueryOptions struct { Order *index.OrderBy TagProjection *TagProjection Name string + Entities [][]*modelv1.TagValue TraceIDs []string MaxTraceSize int } @@ -333,6 +334,7 @@ func (t *TraceQueryOptions) Reset() { t.SkippingFilter = nil t.Order = nil t.TagProjection = nil + t.Entities = nil t.TraceIDs = nil t.MaxTraceSize = 0 } @@ -344,6 +346,7 @@ func (t *TraceQueryOptions) CopyFrom(other *TraceQueryOptions) { t.SkippingFilter = other.SkippingFilter t.Order = other.Order t.TagProjection = other.TagProjection + t.Entities = other.Entities t.MaxTraceSize = other.MaxTraceSize }
