This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3f33966d2cde579745eb6dc29d808802b66736f3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Aug 29 22:16:37 2025 +0800 Refactor trace query and filtering logic to streamline tag handling and improve error management. Remove redundant checks for tag projections and enhance the BuildTagFilter function to accept a global tag name. Update related components to ensure consistency in trace ID processing and filtering behavior. --- banyand/trace/block.go | 3 --- banyand/trace/query.go | 3 --- pkg/query/logical/stream/stream_plan_tag_filter.go | 2 +- pkg/query/logical/tag_filter.go | 11 +++++--- pkg/query/logical/trace/index_filter.go | 6 ++--- pkg/query/logical/trace/schema.go | 2 +- pkg/query/logical/trace/trace_plan_local.go | 6 ++++- pkg/query/logical/trace/trace_plan_tag_filter.go | 31 +++++++++------------- test/cases/trace/data/input/all.yml | 6 +++-- 9 files changed, 32 insertions(+), 38 deletions(-) diff --git a/banyand/trace/block.go b/banyand/trace/block.go index 48dd3c30..6bfa4c54 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -451,9 +451,6 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { } } } - if len(t) == 0 { - return false - } bc.bm.tags = t tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm) diff --git a/banyand/trace/query.go b/banyand/trace/query.go index dea2e827..bfc334de 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -64,9 +64,6 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T if tqo.TimeRange == nil { return nil, errors.New("invalid query options: timeRange are required") } - if tqo.TagProjection == nil || len(tqo.TagProjection.Names) == 0 { - return nil, errors.New("invalid query options: tagProjection is required") - } var tsdb storage.TSDB[*tsTable, option] var err error db := t.tsdb.Load() diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index 69834adb..36a3e040 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -83,7 +83,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) ctx.projectionTags = projTags plan := uis.selectIndexScanner(ctx, uis.ec) if uis.criteria != nil { - tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(ctx.globalConditions) > 1) + tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(ctx.globalConditions) > 1, "") if errFilter != nil { return nil, errFilter } diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index 2433fa70..364b199d 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -76,11 +76,11 @@ type TagFilter interface { // BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support. func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) { - return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false) + return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false, "") } // BuildTagFilter returns a TagFilter. -func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) { +func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string) (TagFilter, error) { if criteria == nil { return DummyFilter, nil } @@ -97,14 +97,17 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index if _, ok := entityDict[cond.Name]; ok { return DummyFilter, nil } + if cond.Name == globalTagName { + return DummyFilter, nil + } return parseFilter(cond, expr, indexChecker) case *modelv1.Criteria_Le: le := criteria.GetLe() - left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex) + left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex, globalTagName) if err != nil { return nil, err } - right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex) + right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex, globalTagName) if err != nil { return nil, err } diff --git a/pkg/query/logical/trace/index_filter.go b/pkg/query/logical/trace/index_filter.go index da60c9d4..c1213453 100644 --- a/pkg/query/logical/trace/index_filter.go +++ b/pkg/query/logical/trace/index_filter.go @@ -43,16 +43,14 @@ func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: cond := criteria.GetCondition() - // Collect the tag name - collectedTagNames = append(collectedTagNames, cond.Name) - // Check if the tag name exists in the allowed tag names if !tagNames[cond.Name] { return nil, nil, collectedTagNames, traceIDs, errors.Errorf("tag name '%s' not found in trace schema", cond.Name) } - // Extract trace IDs if this condition is for the trace ID tag if cond.Name == traceIDTagName && (cond.Op == modelv1.Condition_BINARY_OP_EQ || cond.Op == modelv1.Condition_BINARY_OP_IN) { traceIDs = extractTraceIDsFromCondition(cond) + } else { + collectedTagNames = append(collectedTagNames, cond.Name) } _, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) diff --git a/pkg/query/logical/trace/schema.go b/pkg/query/logical/trace/schema.go index 04c39dc8..624686c8 100644 --- a/pkg/query/logical/trace/schema.go +++ b/pkg/query/logical/trace/schema.go @@ -75,7 +75,7 @@ func (s *schema) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error } func (s *schema) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) { - panic("trace does not support index rule") + return false, &databasev1.IndexRule{} } func (s *schema) EntityList() []string { diff --git a/pkg/query/logical/trace/trace_plan_local.go b/pkg/query/logical/trace/trace_plan_local.go index 2135ec3e..eaa1785d 100644 --- a/pkg/query/logical/trace/trace_plan_local.go +++ b/pkg/query/logical/trace/trace_plan_local.go @@ -97,7 +97,11 @@ func (i *localScan) Execute(ctx context.Context) (model.TraceResult, error) { if i.result == nil { return model.TraceResult{}, nil } - return *i.result.Pull(), nil + traceResult := i.result.Pull() + if traceResult == nil { + return model.TraceResult{}, nil + } + return *traceResult, nil } func (i *localScan) String() string { diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index c4cfbb4b..9addb100 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -70,6 +70,9 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er if len(uis.projectionTags) > 0 { for i := range uis.projectionTags { for _, tag := range uis.projectionTags[i] { + if tag.GetTagName() == uis.traceIDTagName { + continue + } ctx.projectionTags.Names = append(ctx.projectionTags.Names, tag.GetTagName()) } } @@ -93,7 +96,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er } plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs) if uis.criteria != nil { - tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(ctx.globalConditions) > 1) + tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(traceIDs) > 1, uis.traceIDTagName) if errFilter != nil { return nil, errFilter } @@ -120,17 +123,15 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext } type traceAnalyzeContext struct { - s logical.Schema - skippingFilter index.Filter - projectionTags *model.TagProjection - globalConditions []interface{} - projTagsRefs [][]*logical.TagRef + s logical.Schema + skippingFilter index.Filter + projectionTags *model.TagProjection + projTagsRefs [][]*logical.TagRef } func newTraceAnalyzerContext(s logical.Schema) *traceAnalyzeContext { return &traceAnalyzeContext{ - globalConditions: make([]interface{}, 0), - s: s, + s: s, } } @@ -193,20 +194,12 @@ func (t *traceTagFilterPlan) Execute(ctx context.Context) (model.TraceResult, er return model.TraceResult{}, err } - // If there are no tags, no need to check the filter - if len(result.Tags) == 0 { + // If there are no spans, no need to check the filter + if len(result.Spans) == 0 { return model.TraceResult{}, nil } - // For trace, we need to check if ANY row matches the tag filter - // Since result.Tags is column-based, we need to check each row (combination of tag values at same index) - // First, determine the number of rows by finding the maximum length of Values slices - maxRows := 0 - for _, tag := range result.Tags { - if len(tag.Values) > maxRows { - maxRows = len(tag.Values) - } - } + maxRows := len(result.Spans) // Check each row to see if any matches the filter for rowIdx := 0; rowIdx < maxRows; rowIdx++ { diff --git a/test/cases/trace/data/input/all.yml b/test/cases/trace/data/input/all.yml index 301729e6..689aac12 100644 --- a/test/cases/trace/data/input/all.yml +++ b/test/cases/trace/data/input/all.yml @@ -21,5 +21,7 @@ tag_projection: ["trace_id"] criteria: condition: name: "trace_id" - op: "eq" - value: "1" + op: "BINARY_OP_EQ" + value: + str: + value: "1"