This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bbde88e305c4da24bbabd44a63cbe533a2babd02 Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 31 21:42:34 2025 +0800 Enhance trace query processing by introducing timestamp handling and integrating min/max value parameters in query options. Updated related functions to support new timestamp functionality and improved orderByTag logic for better query optimization. --- banyand/query/processor.go | 4 +++- banyand/trace/trace.go | 10 ++-------- pkg/query/logical/trace/schema.go | 4 ++-- pkg/query/logical/trace/trace_analyzer.go | 19 ++++++++++++++++--- pkg/query/logical/trace/trace_plan_local.go | 2 ++ pkg/query/logical/trace/trace_plan_tag_filter.go | 4 ++++ pkg/query/model/model.go | 2 ++ 7 files changed, 31 insertions(+), 14 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index b9076b3c..29dd1f35 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -465,6 +465,7 @@ func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria *t var schemas []logical.Schema var ecc []executor.TraceExecutionContext var traceIDTagName string + var timestampTagName string for i := range queryCriteria.Groups { meta := &commonv1.Metadata{ Name: queryCriteria.Name, @@ -489,9 +490,10 @@ func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria *t return } traceIDTagName = ec.GetSchema().GetTraceIdTagName() + timestampTagName = ec.GetSchema().GetTimestampTagName() } - plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, ecc, traceIDTagName) + plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, ecc, traceIDTagName, timestampTagName) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for trace %s: %v", queryCriteria.GetName(), err)) return diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index 952c313d..ad29e193 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -143,6 +143,8 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.S Filter: tqo.SkippingFilter, Order: tqo.Order, MaxElementSize: tqo.MaxTraceSize, + MinKey: &tqo.MinVal, + MaxKey: &tqo.MaxVal, } // Convert TagProjection to slice format if needed @@ -150,14 +152,6 @@ func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.S req.TagProjection = []model.TagProjection{*tqo.TagProjection} } - // Set key range based on time range - // if tqo.TimeRange != nil { - // minKey := tqo.TimeRange.Start.UnixNano() - // maxKey := tqo.TimeRange.End.UnixNano() - // req.MinKey = &minKey - // req.MaxKey = &maxKey - // } - // Use the provided series IDs for targeted querying if len(seriesIDs) > 0 { req.SeriesIDs = seriesIDs diff --git a/pkg/query/logical/trace/schema.go b/pkg/query/logical/trace/schema.go index 69057712..59c2d54e 100644 --- a/pkg/query/logical/trace/schema.go +++ b/pkg/query/logical/trace/schema.go @@ -82,8 +82,8 @@ func (s *schema) EntityList() []string { return s.common.EntityList } -func (s *schema) IndexDefined(_ string) (bool, *databasev1.IndexRule) { - panic("trace does not support finding index by tag name") +func (s *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { + return s.common.IndexDefined(tagName) } // CreateTagRef create TagRef to the given tags. diff --git a/pkg/query/logical/trace/trace_analyzer.go b/pkg/query/logical/trace/trace_analyzer.go index 7830a104..5544ae60 100644 --- a/pkg/query/logical/trace/trace_analyzer.go +++ b/pkg/query/logical/trace/trace_analyzer.go @@ -33,17 +33,29 @@ const defaultLimit uint32 = 20 // Analyze converts logical expressions to executable operation tree represented by Plan. func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss []logical.Schema, - ecc []executor.TraceExecutionContext, traceIDTagName string, + ecc []executor.TraceExecutionContext, traceIDTagName, timestampTagName string, ) (logical.Plan, error) { // parse fields if len(metadata) != len(ss) { return nil, fmt.Errorf("number of schemas %d not equal to number of metadata %d", len(ss), len(metadata)) } + var orderByTag string + if criteria.OrderBy != nil { + indexRuleName := criteria.OrderBy.IndexRuleName + ok, indexRule := ss[0].IndexRuleDefined(indexRuleName) + if !ok { + return nil, fmt.Errorf("index rule %s not found", indexRuleName) + } + ot := indexRule.Tags[len(indexRule.Tags)-1] + if ot != timestampTagName { + orderByTag = ot + } + } var plan logical.UnresolvedPlan var s logical.Schema tagProjection := convertStringProjectionToTags(criteria.GetTagProjection()) if len(metadata) == 1 { - plan = parseTraceTags(criteria, metadata[0], ecc[0], tagProjection, traceIDTagName) + plan = parseTraceTags(criteria, metadata[0], ecc[0], tagProjection, traceIDTagName, orderByTag) s = ss[0] } else { var err error @@ -208,7 +220,7 @@ func newTraceLimit(input logical.UnresolvedPlan, offset, num uint32) logical.Unr } func parseTraceTags(criteria *tracev1.QueryRequest, metadata *commonv1.Metadata, - ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag, traceIDTagName string, + ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag, traceIDTagName, orderByTag string, ) logical.UnresolvedPlan { timeRange := criteria.GetTimeRange() return &unresolvedTraceTagFilter{ @@ -219,6 +231,7 @@ func parseTraceTags(criteria *tracev1.QueryRequest, metadata *commonv1.Metadata, projectionTags: tagProjection, ec: ec, traceIDTagName: traceIDTagName, + orderByTag: orderByTag, } } diff --git a/pkg/query/logical/trace/trace_plan_local.go b/pkg/query/logical/trace/trace_plan_local.go index 729fe800..93862f32 100644 --- a/pkg/query/logical/trace/trace_plan_local.go +++ b/pkg/query/logical/trace/trace_plan_local.go @@ -97,6 +97,8 @@ func (i *localScan) Execute(ctx context.Context) (iter.Iterator[model.TraceResul Entities: i.entities, MaxTraceSize: i.maxTraceSize, TraceIDs: i.traceIDs, + MinVal: i.minVal, + MaxVal: i.maxVal, }); err != nil { return iter.Empty[model.TraceResult](), err } diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index 39fa8704..dadb1cd4 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -69,6 +69,10 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er if err != nil { return nil, err } + if uis.orderByTag == "" { + minVal = uis.startTime.UnixNano() + maxVal = uis.endTime.UnixNano() + } ctx.entities = entities // Initialize projectionTags even if no explicit projection tags are provided diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 6a504bc1..2b403522 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -325,6 +325,8 @@ type TraceQueryOptions struct { Entities [][]*modelv1.TagValue TraceIDs []string MaxTraceSize int + MinVal int64 + MaxVal int64 } // Reset resets the TraceQueryOptions.
