This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/condition_in in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3b0c3d4832a555b7d3d029fd4c772d48e50ca537 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Sep 19 10:59:04 2025 +0800 Add new test cases for Zipkin trace with query tag support This commit introduces two new YAML files for testing: `having_query_tag.yml` in both input and expected output directories. The input file defines a trace with specific criteria for querying by tags, while the output file contains the expected trace structure, including spans and associated tags. These additions enhance the testing capabilities for trace queries involving the 'query' tag. --- banyand/internal/sidx/introducer.go | 2 +- banyand/internal/sidx/snapshot.go | 71 ++-------------------- banyand/internal/sidx/snapshot_test.go | 17 ------ pkg/query/logical/stream/index_filter.go | 20 +++++- pkg/query/logical/stream/stream_plan_tag_filter.go | 2 +- pkg/query/logical/tag_filter.go | 27 ++++++-- pkg/query/logical/trace/index_filter.go | 37 +++++++---- pkg/query/logical/trace/trace_plan_tag_filter.go | 4 +- .../trace/testdata/index_rule_bindings/zipkin.json | 4 +- .../testdata/index_rules/zipkin-duration.json | 14 ----- .../trace/testdata/index_rules/zipkin-service.json | 14 ----- pkg/test/trace/testdata/traces/zipkin.json | 12 ++-- test/cases/init.go | 2 +- test/cases/stream/data/input/err_in_arr.yaml | 32 ++++++++++ test/cases/stream/stream.go | 1 + test/cases/trace/data/input/err_in_arr.yml | 50 +++++++++++++++ test/cases/trace/data/input/having_query_tag.yml | 50 +++++++++++++++ test/cases/trace/data/testdata/zipkin.json | 41 +++++++++++++ test/cases/trace/data/want/having_query_tag.yml | 61 +++++++++++++++++++ test/cases/trace/trace.go | 2 + 20 files changed, 322 insertions(+), 141 deletions(-) diff --git a/banyand/internal/sidx/introducer.go b/banyand/internal/sidx/introducer.go index b3801a65..3088b186 100644 --- a/banyand/internal/sidx/introducer.go +++ b/banyand/internal/sidx/introducer.go @@ -147,7 +147,7 @@ func (s *sidx) introduceMemPart(nextIntroduction *introduction, epoch uint64) { if cur != nil { defer cur.decRef() } else { - cur = generateSnapshot() + cur = &snapshot{} } next := nextIntroduction.memPart diff --git a/banyand/internal/sidx/snapshot.go b/banyand/internal/sidx/snapshot.go index e06fcc41..d6612503 100644 --- a/banyand/internal/sidx/snapshot.go +++ b/banyand/internal/sidx/snapshot.go @@ -27,7 +27,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/pool" ) const ( @@ -46,19 +45,15 @@ type snapshot struct { // ref is the atomic reference counter for safe concurrent access ref int32 - - // released tracks if this snapshot has been released - released atomic.Bool } // newSnapshot creates a new snapshot with the given parts and epoch. // The snapshot starts with a reference count of 1. func newSnapshot(parts []*partWrapper, epoch uint64) *snapshot { - s := generateSnapshot() + s := &snapshot{} s.parts = append(s.parts[:0], parts...) s.epoch = epoch s.ref = 1 - s.released.Store(false) // Acquire references to all parts to ensure they remain valid for _, pw := range s.parts { @@ -77,25 +72,7 @@ func newSnapshot(parts []*partWrapper, epoch uint64) *snapshot { // acquire increments the snapshot reference count. // Returns true if successful, false if snapshot has been released. func (s *snapshot) acquire() bool { - if s.released.Load() { - return false - } - - for { - oldRef := atomic.LoadInt32(&s.ref) - if oldRef <= 0 { - return false - } - - if atomic.CompareAndSwapInt32(&s.ref, oldRef, oldRef+1) { - // Double-check that snapshot wasn't released during acquire - if s.released.Load() { - s.release() - return false - } - return true - } - } + return atomic.AddInt32(&s.ref, 1) > 0 } // decRef decrements the snapshot reference count (helper for snapshot interface). @@ -118,11 +95,6 @@ func (s *snapshot) release() { Msg("snapshot reference count went negative") return } - - // Mark as released first - s.released.Store(true) - // Return to pool - releaseSnapshot(s) } // getParts returns parts that potentially contain data within the specified key range. @@ -190,17 +162,8 @@ func (s *snapshot) refCount() int32 { return atomic.LoadInt32(&s.ref) } -// isReleased returns true if the snapshot has been released. -func (s *snapshot) isReleased() bool { - return s.released.Load() -} - // validate checks snapshot consistency and part availability. func (s *snapshot) validate() error { - if s.released.Load() { - return fmt.Errorf("snapshot has been released") - } - if atomic.LoadInt32(&s.ref) <= 0 { return fmt.Errorf("snapshot has zero or negative reference count") } @@ -280,7 +243,6 @@ func (s *snapshot) reset() { s.parts = s.parts[:0] s.epoch = 0 s.ref = 0 - s.released.Store(false) } // String returns a string representation of the snapshot. @@ -329,34 +291,12 @@ func mustReadSnapshot(fileSystem fs.FileSystem, root string, snapshot uint64) [] return result } -// Pool for snapshot reuse. -var snapshotPool = pool.Register[*snapshot]("sidx-snapshot") - -// generateSnapshot gets a snapshot from the pool or creates a new one. -func generateSnapshot() *snapshot { - v := snapshotPool.Get() - if v == nil { - return &snapshot{} - } - return v -} - -// releaseSnapshot returns a snapshot to the pool after reset. -func releaseSnapshot(s *snapshot) { - if s == nil { - return - } - s.reset() - snapshotPool.Put(s) -} - // copyAllTo creates a new snapshot with all parts from current snapshot. func (s *snapshot) copyAllTo(epoch uint64) *snapshot { var result snapshot result.parts = make([]*partWrapper, len(s.parts)) result.epoch = epoch result.ref = 1 - result.released.Store(false) // Copy all parts and acquire references copy(result.parts, s.parts) @@ -379,8 +319,9 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) *s result.parts = append(result.parts, n) continue } - s.parts[i].acquire() - result.parts = append(result.parts, s.parts[i]) + if s.parts[i].acquire() { + result.parts = append(result.parts, s.parts[i]) + } } return &result } @@ -390,7 +331,6 @@ func (s *snapshot) remove(epoch uint64, toRemove map[uint64]struct{}) *snapshot var result snapshot result.epoch = epoch result.ref = 1 - result.released.Store(false) // Copy parts except those being removed for _, pw := range s.parts { @@ -399,6 +339,7 @@ func (s *snapshot) remove(epoch uint64, toRemove map[uint64]struct{}) *snapshot result.parts = append(result.parts, pw) } } + pw.markForRemoval() } return &result diff --git a/banyand/internal/sidx/snapshot_test.go b/banyand/internal/sidx/snapshot_test.go index 9029edaf..3322a459 100644 --- a/banyand/internal/sidx/snapshot_test.go +++ b/banyand/internal/sidx/snapshot_test.go @@ -55,10 +55,6 @@ func TestSnapshot_Creation(t *testing.T) { if snapshot.getPartCount() != len(parts) { t.Errorf("expected part count %d, got %d", len(parts), snapshot.getPartCount()) } - - if snapshot.isReleased() { - t.Error("snapshot should not be released") - } } func TestSnapshot_ReferenceCountingBasic(t *testing.T) { @@ -81,18 +77,8 @@ func TestSnapshot_ReferenceCountingBasic(t *testing.T) { t.Errorf("expected ref count 1, got %d", snapshot.refCount()) } - // Check state before final release - isReleasedBefore := snapshot.isReleased() - if isReleasedBefore { - t.Error("snapshot should not be released before final release") - } - // Final release should clean up snapshot.release() - - // After final release, the snapshot object may be reset and returned to pool - // so we can't reliably check its state. The important thing is that it - // doesn't crash and the cleanup happens properly. } func TestSnapshot_ReferenceCountingConcurrent(t *testing.T) { @@ -349,9 +335,6 @@ func TestSnapshot_PoolReuse(t *testing.T) { if snapshot2.refCount() != 1 { t.Errorf("expected ref count 1, got %d", snapshot2.refCount()) } - if snapshot2.isReleased() { - t.Error("new snapshot should not be released") - } } // Helper types and functions. diff --git a/pkg/query/logical/stream/index_filter.go b/pkg/query/logical/stream/index_filter.go index 8cce87ad..89a715e8 100644 --- a/pkg/query/logical/stream/index_filter.go +++ b/pkg/query/logical/stream/index_filter.go @@ -50,7 +50,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, return nil, parsedEntity, nil } if ok, indexRule := schema.IndexDefined(cond.Name); ok && indexRule.Type == indexRuleType { - return parseConditionToFilter(cond, indexRule, expr, entity) + return parseConditionToFilter(cond, indexRule, expr, entity, schema) } return ENode, [][]*modelv1.TagValue{entity}, nil case *modelv1.Criteria_Le: @@ -103,7 +103,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, } func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.IndexRule, - expr logical.LiteralExpr, entity []*modelv1.TagValue, + expr logical.LiteralExpr, entity []*modelv1.TagValue, schema logical.Schema, ) (index.Filter, [][]*modelv1.TagValue, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: @@ -124,6 +124,10 @@ func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.Index case modelv1.Condition_BINARY_OP_NE: return newNot(indexRule, newEq(indexRule, expr)), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_HAVING: + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec == nil { + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v for skipping index", cond) + } ee := expr.SubExprs() l := len(ee) if l < 1 { @@ -135,6 +139,10 @@ func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.Index } return and, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_HAVING: + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec == nil { + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v for skipping index", cond) + } ee := expr.SubExprs() l := len(ee) if l < 1 { @@ -146,6 +154,10 @@ func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.Index } return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_IN: + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "in condition is not supported for array type") + } ee := expr.SubExprs() l := len(ee) if l < 1 { @@ -157,6 +169,10 @@ func parseConditionToFilter(cond *modelv1.Condition, indexRule *databasev1.Index } return or, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_IN: + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "not in condition is not supported for array type") + } ee := expr.SubExprs() l := len(ee) if l < 1 { diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go index 3ab17826..a80e4b6c 100644 --- a/pkg/query/logical/stream/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream/stream_plan_tag_filter.go @@ -83,7 +83,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error) ctx.projectionTags = projTags plan := uis.selectIndexScanner(ctx, uis.ec) if uis.criteria != nil { - tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, false, "") + tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, s, false, "") if errFilter != nil { return nil, errFilter } diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go index cd2b2e2f..9480548c 100644 --- a/pkg/query/logical/tag_filter.go +++ b/pkg/query/logical/tag_filter.go @@ -25,6 +25,7 @@ import ( "github.com/blugelabs/bluge/analysis" "github.com/pkg/errors" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/analyzer" @@ -76,11 +77,13 @@ type TagFilter interface { // BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support. func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) { - return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false, "") + return BuildTagFilter(criteria, nil, nil, emptyIndexChecker{}, false, "") } // BuildTagFilter returns a TagFilter. -func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string) (TagFilter, error) { +func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, + indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string, +) (TagFilter, error) { if criteria == nil { return DummyFilter, nil } @@ -100,14 +103,14 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index if cond.Name == globalTagName { return DummyFilter, nil } - return parseFilter(cond, expr, indexChecker) + return parseFilter(cond, expr, schema, indexChecker) case *modelv1.Criteria_Le: le := criteria.GetLe() - left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex, globalTagName) + left, err := BuildTagFilter(le.Left, entityDict, schema, indexChecker, hasGlobalIndex, globalTagName) if err != nil { return nil, err } - right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex, globalTagName) + right, err := BuildTagFilter(le.Right, entityDict, schema, indexChecker, hasGlobalIndex, globalTagName) if err != nil { return nil, err } @@ -131,7 +134,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, index return nil, ErrInvalidCriteriaType } -func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker IndexChecker) (TagFilter, error) { +func parseFilter(cond *modelv1.Condition, expr ComparableExpr, schema Schema, indexChecker IndexChecker) (TagFilter, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: return newRangeTag(cond.Name, rangeOpts{ @@ -162,8 +165,20 @@ func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker Inde case modelv1.Condition_BINARY_OP_NOT_HAVING: return newNotTag(newHavingTag(cond.Name, expr)), nil case modelv1.Condition_BINARY_OP_IN: + if schema != nil { + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "in condition is not supported for array type") + } + } return newInTag(cond.Name, expr), nil case modelv1.Condition_BINARY_OP_NOT_IN: + if schema != nil { + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "not in condition is not supported for array type") + } + } return newNotTag(newInTag(cond.Name, expr)), nil default: return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag filter parses %v", cond) diff --git a/pkg/query/logical/trace/index_filter.go b/pkg/query/logical/trace/index_filter.go index 44fad4d3..6faa950d 100644 --- a/pkg/query/logical/trace/index_filter.go +++ b/pkg/query/logical/trace/index_filter.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" @@ -34,7 +35,7 @@ import ( // Trace conditions are either entity or skipping index. // Trace creates explicit index rules for skipping index on all tags that don't belong to entity. // Returns min/max int64 values for the orderByTag if provided, otherwise returns math.MaxInt64, math.MinInt64. -func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, +func buildFilter(criteria *modelv1.Criteria, schema logical.Schema, tagNames map[string]bool, entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName string, orderByTag string, ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { if criteria == nil { @@ -43,15 +44,17 @@ func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: - return buildFilterFromCondition(criteria.GetCondition(), tagNames, entityDict, entity, traceIDTagName, orderByTag) + return buildFilterFromCondition(criteria.GetCondition(), schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) case *modelv1.Criteria_Le: - return buildFilterFromLogicalExpression(criteria.GetLe(), tagNames, entityDict, entity, traceIDTagName, orderByTag) + return buildFilterFromLogicalExpression(criteria.GetLe(), schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) } return nil, nil, nil, nil, math.MinInt64, math.MaxInt64, logical.ErrInvalidCriteriaType } -func parseConditionToFilter(cond *modelv1.Condition, entity []*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter, [][]*modelv1.TagValue, error) { +func parseConditionToFilter(cond *modelv1.Condition, schema logical.Schema, + entity []*modelv1.TagValue, expr logical.LiteralExpr, +) (index.Filter, [][]*modelv1.TagValue, error) { switch cond.Op { case modelv1.Condition_BINARY_OP_GT: return &traceRangeFilter{op: "gt", tagName: cond.Name, cond: cond, expr: expr}, [][]*modelv1.TagValue{entity}, nil @@ -72,8 +75,20 @@ func parseConditionToFilter(cond *modelv1.Condition, entity []*modelv1.TagValue, case modelv1.Condition_BINARY_OP_NOT_HAVING: return &traceFilter{op: "not_having", tagName: cond.Name}, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_IN: + if schema != nil { + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, nil, errors.Errorf("in condition is not supported for array type") + } + } return &traceFilter{op: "in", tagName: cond.Name}, [][]*modelv1.TagValue{entity}, nil case modelv1.Condition_BINARY_OP_NOT_IN: + if schema != nil { + tagSpec := schema.FindTagSpecByName(cond.Name) + if tagSpec != nil && (tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == databasev1.TagType_TAG_TYPE_INT_ARRAY) { + return nil, nil, errors.Errorf("not in condition is not supported for array type") + } + } return &traceFilter{op: "not_in", tagName: cond.Name}, [][]*modelv1.TagValue{entity}, nil } return nil, nil, errors.Errorf("unsupported condition operation: %v", cond.Op) @@ -276,7 +291,7 @@ func extractTraceIDsFromCondition(cond *modelv1.Condition) []string { } // buildFilterFromCondition handles single condition filtering and min/max extraction. -func buildFilterFromCondition(cond *modelv1.Condition, tagNames map[string]bool, entityDict map[string]int, +func buildFilterFromCondition(cond *modelv1.Condition, schema logical.Schema, tagNames map[string]bool, entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName, orderByTag string, ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { var collectedTagNames []string @@ -317,12 +332,12 @@ func buildFilterFromCondition(cond *modelv1.Condition, tagNames map[string]bool, if err != nil { return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, err } - filter, entities, err := parseConditionToFilter(cond, entity, expr) + filter, entities, err := parseConditionToFilter(cond, schema, entity, expr) return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err } // buildFilterFromLogicalExpression handles logical expression (AND/OR) filtering and min/max extraction. -func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames map[string]bool, entityDict map[string]int, +func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, schema logical.Schema, tagNames map[string]bool, entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName, orderByTag string, ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, error) { var collectedTagNames []string @@ -334,17 +349,17 @@ func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames ma return nil, nil, nil, traceIDs, minVal, maxVal, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", le) } if le.GetLeft() == nil { - return buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, orderByTag) + return buildFilter(le.Right, schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) } if le.GetRight() == nil { - return buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, orderByTag) + return buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) } - left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err := buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, orderByTag) + left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err := buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) if err != nil { return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err } - right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, orderByTag) + right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, err := buildFilter(le.Right, schema, tagNames, entityDict, entity, traceIDTagName, orderByTag) if err != nil { return nil, nil, append(leftTagNames, rightTagNames...), append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err } diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index 31f2cd4d..270d795d 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -115,7 +115,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er } plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal) if uis.criteria != nil { - tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(traceIDs) > 0, uis.traceIDTagName) + tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName) if errFilter != nil { return nil, errFilter } @@ -189,7 +189,7 @@ func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict m tagNames[tagName] = true } - filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName, orderByTag) + filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName, orderByTag) return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err } diff --git a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json index f1e994d7..cf03fabb 100644 --- a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json +++ b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json @@ -4,9 +4,7 @@ "group": "zipkinTrace" }, "rules": [ - "zipkin-duration", - "zipkin-timestamp", - "zipkin-service" + "zipkin-timestamp" ], "subject": { "catalog": "CATALOG_TRACE", diff --git a/pkg/test/trace/testdata/index_rules/zipkin-duration.json b/pkg/test/trace/testdata/index_rules/zipkin-duration.json deleted file mode 100644 index f8c5ad76..00000000 --- a/pkg/test/trace/testdata/index_rules/zipkin-duration.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "metadata": { - "name": "zipkin-duration", - "group": "zipkinTrace" - }, - "tags": [ - "local_endpoint_service_name", - "operation_name", - "kind", - "duration" - ], - "type": "TYPE_TREE", - "updated_at": "2021-04-15T01:30:15.01Z" -} \ No newline at end of file diff --git a/pkg/test/trace/testdata/index_rules/zipkin-service.json b/pkg/test/trace/testdata/index_rules/zipkin-service.json deleted file mode 100644 index 486d1512..00000000 --- a/pkg/test/trace/testdata/index_rules/zipkin-service.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "metadata": { - "name": "zipkin-service", - "group": "zipkinTrace" - }, - "tags": [ - "local_endpoint_service_name", - "remote_endpoint_service_name", - "operation_name", - "kind" - ], - "type": "TYPE_TREE", - "updated_at": "2021-04-15T01:30:15.01Z" -} \ No newline at end of file diff --git a/pkg/test/trace/testdata/traces/zipkin.json b/pkg/test/trace/testdata/traces/zipkin.json index 7dd6d5a6..d90ee6af 100644 --- a/pkg/test/trace/testdata/traces/zipkin.json +++ b/pkg/test/trace/testdata/traces/zipkin.json @@ -24,10 +24,6 @@ "name": "kind", "type": "TAG_TYPE_STRING" }, - { - "name": "timestamp", - "type": "TAG_TYPE_TIMESTAMP" - }, { "name": "duration", "type": "TAG_TYPE_INT" @@ -63,6 +59,14 @@ { "name": "debug", "type": "TAG_TYPE_INT" + }, + { + "name": "query", + "type": "TAG_TYPE_STRING_ARRAY" + }, + { + "name": "timestamp", + "type": "TAG_TYPE_TIMESTAMP" } ], "trace_id_tag_name": "trace_id", diff --git a/test/cases/init.go b/test/cases/init.go index 7f335558..9c4bb645 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -64,7 +64,7 @@ func Initialize(addr string, now time.Time) { // trace interval = 500 * time.Millisecond casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now, interval) - casestrace.Write(conn, "zipkin", now, interval) + casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now, interval) time.Sleep(5 * time.Second) casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw_mixed_traces", now.Add(time.Minute), interval) } diff --git a/test/cases/stream/data/input/err_in_arr.yaml b/test/cases/stream/data/input/err_in_arr.yaml new file mode 100644 index 00000000..29b1a098 --- /dev/null +++ b/test/cases/stream/data/input/err_in_arr.yaml @@ -0,0 +1,32 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "sw" +groups: ["default"] +projection: + tagFamilies: + - name: "searchable" + tags: ["trace_id", "extended_tags"] + - name: "data" + tags: ["data_binary"] +criteria: + condition: + name: "extended_tags" + op: "BINARY_OP_IN" + value: + strArray: + value: ["c", "b"] diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go index b2376586..97ae14a5 100644 --- a/test/cases/stream/stream.go +++ b/test/cases/stream/stream.go @@ -89,4 +89,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) { g.Entry("multi-groups: update tag type", helpers.Args{Input: "multi_group_tag_type", Duration: 1 * time.Hour, IgnoreElementID: true}), g.Entry("multi-groups: sort duration", helpers.Args{Input: "multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}), g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1 * time.Hour, IgnoreElementID: true}), + g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * time.Hour, WantErr: true}), ) diff --git a/test/cases/trace/data/input/err_in_arr.yml b/test/cases/trace/data/input/err_in_arr.yml new file mode 100644 index 00000000..dc2a33dc --- /dev/null +++ b/test/cases/trace/data/input/err_in_arr.yml @@ -0,0 +1,50 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "zipkin" +groups: ["zipkinTrace"] +tag_projection: ["trace_id", "span_id", "operation_name", "query"] +order_by: + index_rule_name: "zipkin-timestamp" + sort: "SORT_DESC" +criteria: + le: + op: "LOGICAL_OP_AND" + left: + condition: + name: "query" + op: "BINARY_OP_IN" + value: + strArray: + value: ["SELECT * FROM users"] + right: + le: + op: "LOGICAL_OP_AND" + left: + condition: + name: "span_id" + op: "BINARY_OP_EQ" + value: + str: + value: "span_002" + right: + condition: + name: "operation_name" + op: "BINARY_OP_EQ" + value: + str: + value: "/db/query" diff --git a/test/cases/trace/data/input/having_query_tag.yml b/test/cases/trace/data/input/having_query_tag.yml new file mode 100644 index 00000000..59ffde14 --- /dev/null +++ b/test/cases/trace/data/input/having_query_tag.yml @@ -0,0 +1,50 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "zipkin" +groups: ["zipkinTrace"] +tag_projection: ["trace_id", "span_id", "operation_name", "query"] +order_by: + index_rule_name: "zipkin-timestamp" + sort: "SORT_DESC" +criteria: + le: + op: "LOGICAL_OP_AND" + left: + condition: + name: "query" + op: "BINARY_OP_HAVING" + value: + strArray: + value: ["SELECT * FROM users"] + right: + le: + op: "LOGICAL_OP_AND" + left: + condition: + name: "span_id" + op: "BINARY_OP_EQ" + value: + str: + value: "span_002" + right: + condition: + name: "operation_name" + op: "BINARY_OP_EQ" + value: + str: + value: "/db/query" diff --git a/test/cases/trace/data/testdata/zipkin.json b/test/cases/trace/data/testdata/zipkin.json index 19a636f3..9b21b412 100644 --- a/test/cases/trace/data/testdata/zipkin.json +++ b/test/cases/trace/data/testdata/zipkin.json @@ -70,6 +70,15 @@ "int": { "value": 0 } + }, + { + "str_array": { + "value": [ + "user_id=123", + "limit=10", + "offset=0" + ] + } } ], "span": "zipkin_trace_001_span_001" @@ -145,6 +154,14 @@ "int": { "value": 0 } + }, + { + "str_array": { + "value": [ + "SELECT * FROM users", + "WHERE id=123" + ] + } } ], "span": "zipkin_trace_001_span_002" @@ -220,6 +237,15 @@ "int": { "value": 0 } + }, + { + "str_array": { + "value": [ + "order_id=456", + "status=pending", + "customer_id=789" + ] + } } ], "span": "zipkin_trace_002_span_003" @@ -295,6 +321,14 @@ "int": { "value": 0 } + }, + { + "str_array": { + "value": [ + "GET order:456", + "ttl=3600" + ] + } } ], "span": "zipkin_trace_002_span_004" @@ -370,6 +404,13 @@ "int": { "value": 1 } + }, + { + "str_array": { + "value": [ + "health_check=true" + ] + } } ], "span": "zipkin_trace_003_span_005" diff --git a/test/cases/trace/data/want/having_query_tag.yml b/test/cases/trace/data/want/having_query_tag.yml new file mode 100644 index 00000000..dd3880cc --- /dev/null +++ b/test/cases/trace/data/want/having_query_tag.yml @@ -0,0 +1,61 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +traces: + - spans: + - span: zipkin_trace_001_span_001 + tags: + - key: span_id + value: + str: + value: span_001 + - key: operation_name + value: + str: + value: /api/users + - key: query + value: + strArray: + value: + - user_id=123 + - limit=10 + - offset=0 + - key: trace_id + value: + str: + value: zipkin_trace_001 + - span: zipkin_trace_001_span_002 + tags: + - key: span_id + value: + str: + value: span_002 + - key: operation_name + value: + str: + value: /db/query + - key: query + value: + strArray: + value: + - SELECT * FROM users + - WHERE id=123 + - key: trace_id + value: + str: + value: zipkin_trace_001 + \ No newline at end of file diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go index 570b5526..fd767504 100644 --- a/test/cases/trace/trace.go +++ b/test/cases/trace/trace.go @@ -51,4 +51,6 @@ var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { g.Entry("filter by endpoint", helpers.Args{Input: "eq_endpoint_order_duration_asc", Duration: 1 * time.Hour}), g.Entry("order by timestamp limit 2", helpers.Args{Input: "order_timestamp_desc_limit", Duration: 1 * time.Hour}), g.Entry("filter by trace id and service unknown", helpers.Args{Input: "eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}), + g.Entry("filter by query", helpers.Args{Input: "having_query_tag", Duration: 1 * time.Hour}), + g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * time.Hour, WantErr: true}), )
