This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2a9eb8e75184eabf637891bb50a325d30d86bfff Author: Gao Hongtao <[email protected]> AuthorDate: Sun Aug 31 21:21:46 2025 +0800 Implement timestamp literal handling in query logic, enhancing expression parsing and filter building for trace queries. Introduced min/max value extraction for orderByTag in trace filters, improving query optimization and performance. Updated related functions to support new timestamp functionality. --- pkg/query/logical/expr_literal.go | 84 +++++++ pkg/query/logical/parser.go | 10 + pkg/query/logical/trace/index_filter.go | 268 ++++++++++++++++------- pkg/query/logical/trace/trace_plan_local.go | 2 + pkg/query/logical/trace/trace_plan_tag_filter.go | 25 ++- 5 files changed, 307 insertions(+), 82 deletions(-) diff --git a/pkg/query/logical/expr_literal.go b/pkg/query/logical/expr_literal.go index c22bd25c..9b47e08a 100644 --- a/pkg/query/logical/expr_literal.go +++ b/pkg/query/logical/expr_literal.go @@ -24,6 +24,7 @@ import ( "strings" "golang.org/x/exp/slices" + "google.golang.org/protobuf/types/known/timestamppb" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/convert" @@ -444,3 +445,86 @@ func (s nullLiteral) String() string { func (s nullLiteral) Elements() []string { return []string{"null"} } + +var ( + _ LiteralExpr = (*timestampLiteral)(nil) + _ ComparableExpr = (*timestampLiteral)(nil) +) + +type timestampLiteral struct { + timestamp *timestamppb.Timestamp +} + +func (t *timestampLiteral) Field(key index.FieldKey) index.Field { + return index.NewIntField(key, t.timestamp.AsTime().UnixNano()) +} + +func (t *timestampLiteral) RangeOpts(isUpper bool, includeLower bool, includeUpper bool) index.RangeOpts { + nanos := t.timestamp.AsTime().UnixNano() + if isUpper { + return index.NewIntRangeOpts(math.MinInt64, nanos, includeLower, includeUpper) + } + return index.NewIntRangeOpts(nanos, math.MaxInt64, includeLower, includeUpper) +} + +func (t *timestampLiteral) SubExprs() []LiteralExpr { + return []LiteralExpr{t} +} + +func newTimestampLiteral(ts *timestamppb.Timestamp) *timestampLiteral { + return ×tampLiteral{ + timestamp: ts, + } +} + +func (t *timestampLiteral) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*timestampLiteral); ok { + thisNanos := t.timestamp.AsTime().UnixNano() + otherNanos := o.timestamp.AsTime().UnixNano() + return int(thisNanos - otherNanos), true + } + if o, ok := other.(*int64Literal); ok { + thisNanos := t.timestamp.AsTime().UnixNano() + return int(thisNanos - o.int64), true + } + return 0, false +} + +func (t *timestampLiteral) Contains(other LiteralExpr) bool { + if o, ok := other.(*timestampLiteral); ok { + return t.timestamp.AsTime().UnixNano() == o.timestamp.AsTime().UnixNano() + } + if o, ok := other.(*int64Literal); ok { + return t.timestamp.AsTime().UnixNano() == o.int64 + } + return false +} + +func (t *timestampLiteral) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*timestampLiteral); ok { + return t.timestamp.AsTime().UnixNano() == o.timestamp.AsTime().UnixNano() + } + if o, ok := other.(*int64Literal); ok { + return t.timestamp.AsTime().UnixNano() == o.int64 + } + return false +} + +func (t *timestampLiteral) Bytes() [][]byte { + return [][]byte{convert.Int64ToBytes(t.timestamp.AsTime().UnixNano())} +} + +func (t *timestampLiteral) Equal(expr Expr) bool { + if other, ok := expr.(*timestampLiteral); ok { + return t.timestamp.AsTime().UnixNano() == other.timestamp.AsTime().UnixNano() + } + return false +} + +func (t *timestampLiteral) String() string { + return strconv.FormatInt(t.timestamp.AsTime().UnixNano(), 10) +} + +func (t *timestampLiteral) Elements() []string { + return []string{strconv.FormatInt(t.timestamp.AsTime().UnixNano(), 10)} +} diff --git a/pkg/query/logical/parser.go b/pkg/query/logical/parser.go index 1766b7e1..4687c5b2 100644 --- a/pkg/query/logical/parser.go +++ b/pkg/query/logical/parser.go @@ -85,6 +85,14 @@ func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, co return newInt64ArrLiteral(v.IntArray.GetValue()), nil, nil case *modelv1.TagValue_Null: return newNullLiteral(), nil, nil + case *modelv1.TagValue_Timestamp: + if ok { + parsedEntity := make([]*modelv1.TagValue, len(entity)) + copy(parsedEntity, entity) + parsedEntity[entityIdx] = cond.Value + return nil, [][]*modelv1.TagValue{parsedEntity}, nil + } + return newTimestampLiteral(v.Timestamp), nil, nil } return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, "index filter parses %v", cond) } @@ -102,6 +110,8 @@ func ParseExpr(cond *modelv1.Condition) (LiteralExpr, error) { return newInt64ArrLiteral(v.IntArray.GetValue()), nil case *modelv1.TagValue_Null: return newNullLiteral(), nil + case *modelv1.TagValue_Timestamp: + return newTimestampLiteral(v.Timestamp), nil } return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "condition parses %v", cond) } diff --git a/pkg/query/logical/trace/index_filter.go b/pkg/query/logical/trace/index_filter.go index c1213453..a42161b7 100644 --- a/pkg/query/logical/trace/index_filter.go +++ b/pkg/query/logical/trace/index_filter.go @@ -18,6 +18,8 @@ package trace import ( + "math" + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" @@ -31,88 +33,22 @@ import ( // Unlike stream queries, trace doesn't need indexRuleType parameter. // Trace conditions are either entity or skipping index. // Trace creates explicit index rules for skipping index on all tags that don't belong to entity. +// Returns min/max int64 values for the orderByTag if provided, otherwise returns math.MaxInt64, math.MinInt64. func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, - entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName string, -) (index.Filter, [][]*modelv1.TagValue, []string, []string, error) { + entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName string, orderByTag string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { if criteria == nil { - return nil, [][]*modelv1.TagValue{entity}, nil, nil, nil + return nil, [][]*modelv1.TagValue{entity}, nil, nil, math.MaxInt64, math.MinInt64, nil } - var collectedTagNames []string - var traceIDs []string switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: - cond := criteria.GetCondition() - if !tagNames[cond.Name] { - return nil, nil, collectedTagNames, traceIDs, errors.Errorf("tag name '%s' not found in trace schema", cond.Name) - } - - if cond.Name == traceIDTagName && (cond.Op == modelv1.Condition_BINARY_OP_EQ || cond.Op == modelv1.Condition_BINARY_OP_IN) { - traceIDs = extractTraceIDsFromCondition(cond) - } else { - collectedTagNames = append(collectedTagNames, cond.Name) - } - - _, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) - if err != nil { - return nil, nil, collectedTagNames, traceIDs, err - } - if parsedEntity != nil { - return nil, parsedEntity, collectedTagNames, traceIDs, nil - } - // For trace, all non-entity tags have skipping index - expr, _, err := logical.ParseExprOrEntity(entityDict, entity, cond) - if err != nil { - return nil, nil, collectedTagNames, traceIDs, err - } - filter, entities, err := parseConditionToFilter(cond, entity, expr) - return filter, entities, collectedTagNames, traceIDs, err + return buildFilterFromCondition(criteria.GetCondition(), tagNames, entityDict, entity, traceIDTagName, orderByTag) case *modelv1.Criteria_Le: - le := criteria.GetLe() - if le.GetLeft() == nil && le.GetRight() == nil { - return nil, nil, nil, traceIDs, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) - } - if le.GetLeft() == nil { - return buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName) - } - if le.GetRight() == nil { - return buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName) - } - left, leftEntities, leftTagNames, leftTraceIDs, err := buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName) - if err != nil { - return nil, nil, leftTagNames, leftTraceIDs, err - } - right, rightEntities, rightTagNames, rightTraceIDs, err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName) - if err != nil { - return nil, nil, append(leftTagNames, rightTagNames...), append(leftTraceIDs, rightTraceIDs...), err - } - - // Merge tag names from both sides - collectedTagNames = append(collectedTagNames, leftTagNames...) - collectedTagNames = append(collectedTagNames, rightTagNames...) - - // Merge trace IDs from both sides - traceIDs = append(traceIDs, leftTraceIDs...) - traceIDs = append(traceIDs, rightTraceIDs...) - - entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities) - if entities == nil { - return nil, nil, collectedTagNames, traceIDs, nil - } - if left == nil { - return right, entities, collectedTagNames, traceIDs, nil - } - if right == nil { - return left, entities, collectedTagNames, traceIDs, nil - } - switch le.Op { - case modelv1.LogicalExpression_LOGICAL_OP_AND: - return &traceAndFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, nil - case modelv1.LogicalExpression_LOGICAL_OP_OR: - return &traceOrFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, nil - } + return buildFilterFromLogicalExpression(criteria.GetLe(), tagNames, entityDict, entity, traceIDTagName, orderByTag) } - return nil, nil, nil, traceIDs, logical.ErrInvalidCriteriaType + + return nil, nil, nil, nil, math.MaxInt64, math.MinInt64, logical.ErrInvalidCriteriaType } func parseConditionToFilter(cond *modelv1.Condition, entity []*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter, [][]*modelv1.TagValue, error) { @@ -338,3 +274,187 @@ func extractTraceIDsFromCondition(cond *modelv1.Condition) []string { return traceIDs } + +// buildFilterFromCondition handles single condition filtering and min/max extraction. +func buildFilterFromCondition(cond *modelv1.Condition, tagNames map[string]bool, entityDict map[string]int, + entity []*modelv1.TagValue, traceIDTagName, orderByTag string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { + var collectedTagNames []string + var traceIDs []string + minVal := int64(math.MaxInt64) + maxVal := int64(math.MinInt64) + + if !tagNames[cond.Name] { + return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, errors.Errorf("tag name '%s' not found in trace schema", cond.Name) + } + + // Extract min/max bounds if this condition matches orderByTag + if cond.Name == orderByTag { + condMin, condMax := extractBoundsFromCondition(cond) + if condMin != math.MaxInt64 { + minVal = condMin + } + if condMax != math.MinInt64 { + maxVal = condMax + } + } + + if cond.Name == traceIDTagName && (cond.Op == modelv1.Condition_BINARY_OP_EQ || cond.Op == modelv1.Condition_BINARY_OP_IN) { + traceIDs = extractTraceIDsFromCondition(cond) + } else { + collectedTagNames = append(collectedTagNames, cond.Name) + } + + _, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, err + } + if parsedEntity != nil { + return nil, parsedEntity, collectedTagNames, traceIDs, minVal, maxVal, nil + } + // For trace, all non-entity tags have skipping index + expr, _, err := logical.ParseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, err + } + filter, entities, err := parseConditionToFilter(cond, entity, expr) + return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err +} + +// buildFilterFromLogicalExpression handles logical expression (AND/OR) filtering and min/max extraction. +func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames map[string]bool, entityDict map[string]int, + entity []*modelv1.TagValue, traceIDTagName, orderByTag string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { + var collectedTagNames []string + var traceIDs []string + minVal := int64(math.MaxInt64) + maxVal := int64(math.MinInt64) + + if le.GetLeft() == nil && le.GetRight() == nil { + return nil, nil, nil, traceIDs, minVal, maxVal, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", le) + } + if le.GetLeft() == nil { + return buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, orderByTag) + } + if le.GetRight() == nil { + return buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, orderByTag) + } + + left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err := buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, orderByTag) + if err != nil { + return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err + } + right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, orderByTag) + if err != nil { + return nil, nil, append(leftTagNames, rightTagNames...), append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err + } + + // Merge tag names from both sides + collectedTagNames = append(collectedTagNames, leftTagNames...) + collectedTagNames = append(collectedTagNames, rightTagNames...) + + // Merge trace IDs from both sides + traceIDs = append(traceIDs, leftTraceIDs...) + traceIDs = append(traceIDs, rightTraceIDs...) + + // Merge min/max values based on logical operation + finalMin, finalMax := mergeMinMaxBounds(le.Op, leftMin, leftMax, rightMin, rightMax) + + entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities) + if entities == nil { + return nil, nil, collectedTagNames, traceIDs, finalMin, finalMax, nil + } + if left == nil { + return right, entities, collectedTagNames, traceIDs, finalMin, finalMax, nil + } + if right == nil { + return left, entities, collectedTagNames, traceIDs, finalMin, finalMax, nil + } + switch le.Op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + return &traceAndFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, finalMin, finalMax, nil + case modelv1.LogicalExpression_LOGICAL_OP_OR: + return &traceOrFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, finalMin, finalMax, nil + } + return nil, nil, collectedTagNames, traceIDs, finalMin, finalMax, logical.ErrInvalidCriteriaType +} + +// mergeMinMaxBounds merges min/max bounds based on logical operation. +func mergeMinMaxBounds(op modelv1.LogicalExpression_LogicalOp, leftMin, leftMax, rightMin, rightMax int64) (int64, int64) { + switch op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + // Intersection - take tighter bounds + finalMin := leftMin + if rightMin > finalMin { + finalMin = rightMin + } + finalMax := leftMax + if rightMax < finalMax { + finalMax = rightMax + } + return finalMin, finalMax + case modelv1.LogicalExpression_LOGICAL_OP_OR: + // Union - take wider bounds + finalMin := leftMin + if rightMin < finalMin { + finalMin = rightMin + } + finalMax := leftMax + if rightMax > finalMax { + finalMax = rightMax + } + return finalMin, finalMax + } + return leftMin, leftMax +} + +// extractBoundsFromCondition extracts bounds from a single condition. +func extractBoundsFromCondition(cond *modelv1.Condition) (int64, int64) { + if cond.Value == nil || cond.Value.Value == nil { + return math.MaxInt64, math.MinInt64 + } + + var value int64 + + switch v := cond.Value.Value.(type) { + case *modelv1.TagValue_Int: + if v.Int != nil { + value = v.Int.Value + } else { + return math.MaxInt64, math.MinInt64 + } + case *modelv1.TagValue_Timestamp: + if v.Timestamp != nil { + value = v.Timestamp.AsTime().UnixNano() + } else { + return math.MaxInt64, math.MinInt64 + } + default: + // Only support int64 and timestamp for range operations + return math.MaxInt64, math.MinInt64 + } + + switch cond.Op { + case modelv1.Condition_BINARY_OP_GT: + // value > X means min is X+1, max is unbounded + if value < math.MaxInt64 { + return value + 1, math.MinInt64 + } + return math.MaxInt64, math.MinInt64 + case modelv1.Condition_BINARY_OP_GE: + // value >= X means min is X, max is unbounded + return value, math.MinInt64 + case modelv1.Condition_BINARY_OP_LT: + // value < X means min is unbounded, max is X-1 + if value > math.MinInt64 { + return math.MaxInt64, value - 1 + } + return math.MaxInt64, math.MinInt64 + case modelv1.Condition_BINARY_OP_LE: + // value <= X means min is unbounded, max is X + return math.MaxInt64, value + default: + // Non-range operations don't contribute bounds + return math.MaxInt64, math.MinInt64 + } +} diff --git a/pkg/query/logical/trace/trace_plan_local.go b/pkg/query/logical/trace/trace_plan_local.go index 71aebf07..729fe800 100644 --- a/pkg/query/logical/trace/trace_plan_local.go +++ b/pkg/query/logical/trace/trace_plan_local.go @@ -53,6 +53,8 @@ type localScan struct { entities [][]*modelv1.TagValue traceIDs []string maxTraceSize int + minVal int64 + maxVal int64 } func (i *localScan) Close() { diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index b6086044..39fa8704 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -20,6 +20,7 @@ package trace import ( "context" "fmt" + "math" "time" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -43,6 +44,7 @@ type unresolvedTraceTagFilter struct { metadata *commonv1.Metadata criteria *modelv1.Criteria traceIDTagName string + orderByTag string projectionTags [][]*logical.Tag } @@ -60,8 +62,10 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er var conditionTagNames []string var traceIDs []string var entities [][]*modelv1.TagValue + var minVal, maxVal int64 // For trace, we use skipping filter and capture entities for query optimization - ctx.skippingFilter, entities, conditionTagNames, traceIDs, err = buildTraceFilter(uis.criteria, s, entityDict, entity, uis.traceIDTagName) + ctx.skippingFilter, entities, conditionTagNames, traceIDs, minVal, maxVal, err = buildTraceFilter( + uis.criteria, s, entityDict, entity, uis.traceIDTagName, uis.orderByTag) if err != nil { return nil, err } @@ -101,7 +105,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er return nil, errProject } } - plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs) + plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal) if uis.criteria != nil { tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(traceIDs) > 1, uis.traceIDTagName) if errFilter != nil { @@ -115,7 +119,9 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er return plan, err } -func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext, ec executor.TraceExecutionContext, traceIDs []string) logical.Plan { +func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext, + ec executor.TraceExecutionContext, traceIDs []string, minVal, maxVal int64, +) logical.Plan { return &localScan{ timeRange: timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime), schema: ctx.s, @@ -127,6 +133,8 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext l: logger.GetLogger("query", "trace", "local-scan"), ec: ec, traceIDs: traceIDs, + minVal: minVal, + maxVal: maxVal, } } @@ -159,11 +167,12 @@ func deduplicateStrings(strings []string) []string { // buildTraceFilter builds a filter for trace queries and returns both the filter and collected tag names. // Unlike stream, trace only needs skipping filter. +// Returns min/max int64 values for the orderByTag if provided, otherwise returns math.MaxInt64, math.MinInt64. func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict map[string]int, - entity []*modelv1.TagValue, traceIDTagName string, -) (index.Filter, [][]*modelv1.TagValue, []string, []string, error) { + entity []*modelv1.TagValue, traceIDTagName string, orderByTag string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { if criteria == nil { - return nil, [][]*modelv1.TagValue{entity}, nil, nil, nil + return nil, [][]*modelv1.TagValue{entity}, nil, nil, math.MaxInt64, math.MinInt64, nil } // Create a map of valid tag names from the schema tagNames := make(map[string]bool) @@ -172,8 +181,8 @@ func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict m tagNames[tagName] = true } - filter, entities, collectedTagNames, traceIDs, err := buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName) - return filter, entities, collectedTagNames, traceIDs, err + filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName, orderByTag) + return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err } var (
