This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a34e4615 Fix returning empty result when using IN operator on array
type tags (#775)
a34e4615 is described below
commit a34e4615eddb0dd614152177391f5d2db5522d3c
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 19 13:38:51 2025 +0800
Fix returning empty result when using IN operator on array type tags (#775)
---
CHANGES.md | 1 +
banyand/internal/sidx/introducer.go | 2 +-
banyand/internal/sidx/snapshot.go | 72 +++-------------------
banyand/internal/sidx/snapshot_test.go | 17 -----
pkg/query/logical/stream/index_filter.go | 20 +++++-
pkg/query/logical/stream/stream_plan_tag_filter.go | 2 +-
pkg/query/logical/tag_filter.go | 27 ++++++--
pkg/query/logical/trace/index_filter.go | 37 +++++++----
pkg/query/logical/trace/trace_plan_tag_filter.go | 4 +-
.../trace/testdata/index_rule_bindings/zipkin.json | 4 +-
.../testdata/index_rules/zipkin-duration.json | 14 -----
.../trace/testdata/index_rules/zipkin-service.json | 14 -----
pkg/test/trace/testdata/traces/zipkin.json | 12 ++--
test/cases/init.go | 2 +-
test/cases/stream/data/input/err_in_arr.yaml | 32 ++++++++++
test/cases/stream/stream.go | 1 +
test/cases/trace/data/input/err_in_arr.yml | 50 +++++++++++++++
test/cases/trace/data/input/having_query_tag.yml | 50 +++++++++++++++
test/cases/trace/data/testdata/zipkin.json | 41 ++++++++++++
test/cases/trace/data/want/having_query_tag.yml | 61 ++++++++++++++++++
test/cases/trace/trace.go | 2 +
21 files changed, 324 insertions(+), 141 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 63ebb7f6..d788fc11 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,6 +56,7 @@ Release Notes.
- Fix the crash when collecting the metrics from a closed segment.
- Fix topN parsing panic when the criteria is set.
- Remove the indexed_only field in TagSpec.
+- Fix returning empty result when using IN operatior on the array type tags.
### Document
diff --git a/banyand/internal/sidx/introducer.go
b/banyand/internal/sidx/introducer.go
index b3801a65..3088b186 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -147,7 +147,7 @@ func (s *sidx) introduceMemPart(nextIntroduction
*introduction, epoch uint64) {
if cur != nil {
defer cur.decRef()
} else {
- cur = generateSnapshot()
+ cur = &snapshot{}
}
next := nextIntroduction.memPart
diff --git a/banyand/internal/sidx/snapshot.go
b/banyand/internal/sidx/snapshot.go
index e06fcc41..ce7cf6a6 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -27,7 +27,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/pool"
)
const (
@@ -46,19 +45,15 @@ type snapshot struct {
// ref is the atomic reference counter for safe concurrent access
ref int32
-
- // released tracks if this snapshot has been released
- released atomic.Bool
}
// newSnapshot creates a new snapshot with the given parts and epoch.
// The snapshot starts with a reference count of 1.
func newSnapshot(parts []*partWrapper, epoch uint64) *snapshot {
- s := generateSnapshot()
+ s := &snapshot{}
s.parts = append(s.parts[:0], parts...)
s.epoch = epoch
s.ref = 1
- s.released.Store(false)
// Acquire references to all parts to ensure they remain valid
for _, pw := range s.parts {
@@ -77,25 +72,7 @@ func newSnapshot(parts []*partWrapper, epoch uint64)
*snapshot {
// acquire increments the snapshot reference count.
// Returns true if successful, false if snapshot has been released.
func (s *snapshot) acquire() bool {
- if s.released.Load() {
- return false
- }
-
- for {
- oldRef := atomic.LoadInt32(&s.ref)
- if oldRef <= 0 {
- return false
- }
-
- if atomic.CompareAndSwapInt32(&s.ref, oldRef, oldRef+1) {
- // Double-check that snapshot wasn't released during
acquire
- if s.released.Load() {
- s.release()
- return false
- }
- return true
- }
- }
+ return atomic.AddInt32(&s.ref, 1) > 0
}
// decRef decrements the snapshot reference count (helper for snapshot
interface).
@@ -118,11 +95,7 @@ func (s *snapshot) release() {
Msg("snapshot reference count went negative")
return
}
-
- // Mark as released first
- s.released.Store(true)
- // Return to pool
- releaseSnapshot(s)
+ s.reset()
}
// getParts returns parts that potentially contain data within the specified
key range.
@@ -190,17 +163,8 @@ func (s *snapshot) refCount() int32 {
return atomic.LoadInt32(&s.ref)
}
-// isReleased returns true if the snapshot has been released.
-func (s *snapshot) isReleased() bool {
- return s.released.Load()
-}
-
// validate checks snapshot consistency and part availability.
func (s *snapshot) validate() error {
- if s.released.Load() {
- return fmt.Errorf("snapshot has been released")
- }
-
if atomic.LoadInt32(&s.ref) <= 0 {
return fmt.Errorf("snapshot has zero or negative reference
count")
}
@@ -280,7 +244,6 @@ func (s *snapshot) reset() {
s.parts = s.parts[:0]
s.epoch = 0
s.ref = 0
- s.released.Store(false)
}
// String returns a string representation of the snapshot.
@@ -329,34 +292,12 @@ func mustReadSnapshot(fileSystem fs.FileSystem, root
string, snapshot uint64) []
return result
}
-// Pool for snapshot reuse.
-var snapshotPool = pool.Register[*snapshot]("sidx-snapshot")
-
-// generateSnapshot gets a snapshot from the pool or creates a new one.
-func generateSnapshot() *snapshot {
- v := snapshotPool.Get()
- if v == nil {
- return &snapshot{}
- }
- return v
-}
-
-// releaseSnapshot returns a snapshot to the pool after reset.
-func releaseSnapshot(s *snapshot) {
- if s == nil {
- return
- }
- s.reset()
- snapshotPool.Put(s)
-}
-
// copyAllTo creates a new snapshot with all parts from current snapshot.
func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
var result snapshot
result.parts = make([]*partWrapper, len(s.parts))
result.epoch = epoch
result.ref = 1
- result.released.Store(false)
// Copy all parts and acquire references
copy(result.parts, s.parts)
@@ -379,8 +320,9 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts
map[uint64]*partWrapper) *s
result.parts = append(result.parts, n)
continue
}
- s.parts[i].acquire()
- result.parts = append(result.parts, s.parts[i])
+ if s.parts[i].acquire() {
+ result.parts = append(result.parts, s.parts[i])
+ }
}
return &result
}
@@ -390,7 +332,6 @@ func (s *snapshot) remove(epoch uint64, toRemove
map[uint64]struct{}) *snapshot
var result snapshot
result.epoch = epoch
result.ref = 1
- result.released.Store(false)
// Copy parts except those being removed
for _, pw := range s.parts {
@@ -399,6 +340,7 @@ func (s *snapshot) remove(epoch uint64, toRemove
map[uint64]struct{}) *snapshot
result.parts = append(result.parts, pw)
}
}
+ pw.markForRemoval()
}
return &result
diff --git a/banyand/internal/sidx/snapshot_test.go
b/banyand/internal/sidx/snapshot_test.go
index 9029edaf..3322a459 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -55,10 +55,6 @@ func TestSnapshot_Creation(t *testing.T) {
if snapshot.getPartCount() != len(parts) {
t.Errorf("expected part count %d, got %d", len(parts),
snapshot.getPartCount())
}
-
- if snapshot.isReleased() {
- t.Error("snapshot should not be released")
- }
}
func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
@@ -81,18 +77,8 @@ func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
t.Errorf("expected ref count 1, got %d", snapshot.refCount())
}
- // Check state before final release
- isReleasedBefore := snapshot.isReleased()
- if isReleasedBefore {
- t.Error("snapshot should not be released before final release")
- }
-
// Final release should clean up
snapshot.release()
-
- // After final release, the snapshot object may be reset and returned
to pool
- // so we can't reliably check its state. The important thing is that it
- // doesn't crash and the cleanup happens properly.
}
func TestSnapshot_ReferenceCountingConcurrent(t *testing.T) {
@@ -349,9 +335,6 @@ func TestSnapshot_PoolReuse(t *testing.T) {
if snapshot2.refCount() != 1 {
t.Errorf("expected ref count 1, got %d", snapshot2.refCount())
}
- if snapshot2.isReleased() {
- t.Error("new snapshot should not be released")
- }
}
// Helper types and functions.
diff --git a/pkg/query/logical/stream/index_filter.go
b/pkg/query/logical/stream/index_filter.go
index 8cce87ad..89a715e8 100644
--- a/pkg/query/logical/stream/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -50,7 +50,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema
logical.Schema,
return nil, parsedEntity, nil
}
if ok, indexRule := schema.IndexDefined(cond.Name); ok &&
indexRule.Type == indexRuleType {
- return parseConditionToFilter(cond, indexRule, expr,
entity)
+ return parseConditionToFilter(cond, indexRule, expr,
entity, schema)
}
return ENode, [][]*modelv1.TagValue{entity}, nil
case *modelv1.Criteria_Le:
@@ -103,7 +103,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema
logical.Schema,
}
func parseConditionToFilter(cond *modelv1.Condition, indexRule
*databasev1.IndexRule,
- expr logical.LiteralExpr, entity []*modelv1.TagValue,
+ expr logical.LiteralExpr, entity []*modelv1.TagValue, schema
logical.Schema,
) (index.Filter, [][]*modelv1.TagValue, error) {
switch cond.Op {
case modelv1.Condition_BINARY_OP_GT:
@@ -124,6 +124,10 @@ func parseConditionToFilter(cond *modelv1.Condition,
indexRule *databasev1.Index
case modelv1.Condition_BINARY_OP_NE:
return newNot(indexRule, newEq(indexRule, expr)),
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_HAVING:
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec == nil {
+ return nil, nil,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v
for skipping index", cond)
+ }
ee := expr.SubExprs()
l := len(ee)
if l < 1 {
@@ -135,6 +139,10 @@ func parseConditionToFilter(cond *modelv1.Condition,
indexRule *databasev1.Index
}
return and, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec == nil {
+ return nil, nil,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v
for skipping index", cond)
+ }
ee := expr.SubExprs()
l := len(ee)
if l < 1 {
@@ -146,6 +154,10 @@ func parseConditionToFilter(cond *modelv1.Condition,
indexRule *databasev1.Index
}
return newNot(indexRule, and), [][]*modelv1.TagValue{entity},
nil
case modelv1.Condition_BINARY_OP_IN:
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil, nil,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "in condition is not
supported for array type")
+ }
ee := expr.SubExprs()
l := len(ee)
if l < 1 {
@@ -157,6 +169,10 @@ func parseConditionToFilter(cond *modelv1.Condition,
indexRule *databasev1.Index
}
return or, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil, nil,
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "not in condition is not
supported for array type")
+ }
ee := expr.SubExprs()
l := len(ee)
if l < 1 {
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index 3ab17826..a80e4b6c 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -83,7 +83,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema)
(logical.Plan, error)
ctx.projectionTags = projTags
plan := uis.selectIndexScanner(ctx, uis.ec)
if uis.criteria != nil {
- tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, false, "")
+ tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, s, false, "")
if errFilter != nil {
return nil, errFilter
}
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cd2b2e2f..9480548c 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -25,6 +25,7 @@ import (
"github.com/blugelabs/bluge/analysis"
"github.com/pkg/errors"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index/analyzer"
@@ -76,11 +77,13 @@ type TagFilter interface {
// BuildSimpleTagFilter returns a TagFilter without any local-index, global
index, sharding key support.
func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
- return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false, "")
+ return BuildTagFilter(criteria, nil, nil, emptyIndexChecker{}, false,
"")
}
// BuildTagFilter returns a TagFilter.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int,
indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string)
(TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int,
schema Schema,
+ indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string,
+) (TagFilter, error) {
if criteria == nil {
return DummyFilter, nil
}
@@ -100,14 +103,14 @@ func BuildTagFilter(criteria *modelv1.Criteria,
entityDict map[string]int, index
if cond.Name == globalTagName {
return DummyFilter, nil
}
- return parseFilter(cond, expr, indexChecker)
+ return parseFilter(cond, expr, schema, indexChecker)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
- left, err := BuildTagFilter(le.Left, entityDict, indexChecker,
hasGlobalIndex, globalTagName)
+ left, err := BuildTagFilter(le.Left, entityDict, schema,
indexChecker, hasGlobalIndex, globalTagName)
if err != nil {
return nil, err
}
- right, err := BuildTagFilter(le.Right, entityDict,
indexChecker, hasGlobalIndex, globalTagName)
+ right, err := BuildTagFilter(le.Right, entityDict, schema,
indexChecker, hasGlobalIndex, globalTagName)
if err != nil {
return nil, err
}
@@ -131,7 +134,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict
map[string]int, index
return nil, ErrInvalidCriteriaType
}
-func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker
IndexChecker) (TagFilter, error) {
+func parseFilter(cond *modelv1.Condition, expr ComparableExpr, schema Schema,
indexChecker IndexChecker) (TagFilter, error) {
switch cond.Op {
case modelv1.Condition_BINARY_OP_GT:
return newRangeTag(cond.Name, rangeOpts{
@@ -162,8 +165,20 @@ func parseFilter(cond *modelv1.Condition, expr
ComparableExpr, indexChecker Inde
case modelv1.Condition_BINARY_OP_NOT_HAVING:
return newNotTag(newHavingTag(cond.Name, expr)), nil
case modelv1.Condition_BINARY_OP_IN:
+ if schema != nil {
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil,
errors.WithMessagef(ErrUnsupportedConditionOp, "in condition is not supported
for array type")
+ }
+ }
return newInTag(cond.Name, expr), nil
case modelv1.Condition_BINARY_OP_NOT_IN:
+ if schema != nil {
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil,
errors.WithMessagef(ErrUnsupportedConditionOp, "not in condition is not
supported for array type")
+ }
+ }
return newNotTag(newInTag(cond.Name, expr)), nil
default:
return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag
filter parses %v", cond)
diff --git a/pkg/query/logical/trace/index_filter.go
b/pkg/query/logical/trace/index_filter.go
index 44fad4d3..6faa950d 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -34,7 +35,7 @@ import (
// Trace conditions are either entity or skipping index.
// Trace creates explicit index rules for skipping index on all tags that
don't belong to entity.
// Returns min/max int64 values for the orderByTag if provided, otherwise
returns math.MaxInt64, math.MinInt64.
-func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool,
+func buildFilter(criteria *modelv1.Criteria, schema logical.Schema, tagNames
map[string]bool,
entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName
string, orderByTag string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
if criteria == nil {
@@ -43,15 +44,17 @@ func buildFilter(criteria *modelv1.Criteria, tagNames
map[string]bool,
switch criteria.GetExp().(type) {
case *modelv1.Criteria_Condition:
- return buildFilterFromCondition(criteria.GetCondition(),
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+ return buildFilterFromCondition(criteria.GetCondition(),
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
case *modelv1.Criteria_Le:
- return buildFilterFromLogicalExpression(criteria.GetLe(),
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+ return buildFilterFromLogicalExpression(criteria.GetLe(),
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
}
return nil, nil, nil, nil, math.MinInt64, math.MaxInt64,
logical.ErrInvalidCriteriaType
}
-func parseConditionToFilter(cond *modelv1.Condition, entity
[]*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter,
[][]*modelv1.TagValue, error) {
+func parseConditionToFilter(cond *modelv1.Condition, schema logical.Schema,
+ entity []*modelv1.TagValue, expr logical.LiteralExpr,
+) (index.Filter, [][]*modelv1.TagValue, error) {
switch cond.Op {
case modelv1.Condition_BINARY_OP_GT:
return &traceRangeFilter{op: "gt", tagName: cond.Name, cond:
cond, expr: expr}, [][]*modelv1.TagValue{entity}, nil
@@ -72,8 +75,20 @@ func parseConditionToFilter(cond *modelv1.Condition, entity
[]*modelv1.TagValue,
case modelv1.Condition_BINARY_OP_NOT_HAVING:
return &traceFilter{op: "not_having", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_IN:
+ if schema != nil {
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil, nil, errors.Errorf("in condition is
not supported for array type")
+ }
+ }
return &traceFilter{op: "in", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
+ if schema != nil {
+ tagSpec := schema.FindTagSpecByName(cond.Name)
+ if tagSpec != nil && (tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() ==
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+ return nil, nil, errors.Errorf("not in
condition is not supported for array type")
+ }
+ }
return &traceFilter{op: "not_in", tagName: cond.Name},
[][]*modelv1.TagValue{entity}, nil
}
return nil, nil, errors.Errorf("unsupported condition operation: %v",
cond.Op)
@@ -276,7 +291,7 @@ func extractTraceIDsFromCondition(cond *modelv1.Condition)
[]string {
}
// buildFilterFromCondition handles single condition filtering and min/max
extraction.
-func buildFilterFromCondition(cond *modelv1.Condition, tagNames
map[string]bool, entityDict map[string]int,
+func buildFilterFromCondition(cond *modelv1.Condition, schema logical.Schema,
tagNames map[string]bool, entityDict map[string]int,
entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
var collectedTagNames []string
@@ -317,12 +332,12 @@ func buildFilterFromCondition(cond *modelv1.Condition,
tagNames map[string]bool,
if err != nil {
return nil, nil, collectedTagNames, traceIDs, minVal, maxVal,
err
}
- filter, entities, err := parseConditionToFilter(cond, entity, expr)
+ filter, entities, err := parseConditionToFilter(cond, schema, entity,
expr)
return filter, entities, collectedTagNames, traceIDs, minVal, maxVal,
err
}
// buildFilterFromLogicalExpression handles logical expression (AND/OR)
filtering and min/max extraction.
-func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames
map[string]bool, entityDict map[string]int,
+func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, schema
logical.Schema, tagNames map[string]bool, entityDict map[string]int,
entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64,
error) {
var collectedTagNames []string
@@ -334,17 +349,17 @@ func buildFilterFromLogicalExpression(le
*modelv1.LogicalExpression, tagNames ma
return nil, nil, nil, traceIDs, minVal, maxVal,
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and
right) of [%v] are empty", le)
}
if le.GetLeft() == nil {
- return buildFilter(le.Right, tagNames, entityDict, entity,
traceIDTagName, orderByTag)
+ return buildFilter(le.Right, schema, tagNames, entityDict,
entity, traceIDTagName, orderByTag)
}
if le.GetRight() == nil {
- return buildFilter(le.Left, tagNames, entityDict, entity,
traceIDTagName, orderByTag)
+ return buildFilter(le.Left, schema, tagNames, entityDict,
entity, traceIDTagName, orderByTag)
}
- left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err
:= buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
+ left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err
:= buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
if err != nil {
return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err
}
- right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax,
err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
+ right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax,
err := buildFilter(le.Right, schema, tagNames, entityDict, entity,
traceIDTagName, orderByTag)
if err != nil {
return nil, nil, append(leftTagNames, rightTagNames...),
append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err
}
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 31f2cd4d..270d795d 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -115,7 +115,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s
logical.Schema) (logical.Plan, er
}
plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
if uis.criteria != nil {
- tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, len(traceIDs) > 0, uis.traceIDTagName)
+ tagFilter, errFilter := logical.BuildTagFilter(uis.criteria,
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName)
if errFilter != nil {
return nil, errFilter
}
@@ -189,7 +189,7 @@ func buildTraceFilter(criteria *modelv1.Criteria, s
logical.Schema, entityDict m
tagNames[tagName] = true
}
- filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err :=
buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName, orderByTag)
+ filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err :=
buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName,
orderByTag)
return filter, entities, collectedTagNames, traceIDs, minVal, maxVal,
err
}
diff --git a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
index f1e994d7..cf03fabb 100644
--- a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
+++ b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
@@ -4,9 +4,7 @@
"group": "zipkinTrace"
},
"rules": [
- "zipkin-duration",
- "zipkin-timestamp",
- "zipkin-service"
+ "zipkin-timestamp"
],
"subject": {
"catalog": "CATALOG_TRACE",
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-duration.json
b/pkg/test/trace/testdata/index_rules/zipkin-duration.json
deleted file mode 100644
index f8c5ad76..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-duration.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "metadata": {
- "name": "zipkin-duration",
- "group": "zipkinTrace"
- },
- "tags": [
- "local_endpoint_service_name",
- "operation_name",
- "kind",
- "duration"
- ],
- "type": "TYPE_TREE",
- "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-service.json
b/pkg/test/trace/testdata/index_rules/zipkin-service.json
deleted file mode 100644
index 486d1512..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-service.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "metadata": {
- "name": "zipkin-service",
- "group": "zipkinTrace"
- },
- "tags": [
- "local_endpoint_service_name",
- "remote_endpoint_service_name",
- "operation_name",
- "kind"
- ],
- "type": "TYPE_TREE",
- "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/zipkin.json
b/pkg/test/trace/testdata/traces/zipkin.json
index 7dd6d5a6..d90ee6af 100644
--- a/pkg/test/trace/testdata/traces/zipkin.json
+++ b/pkg/test/trace/testdata/traces/zipkin.json
@@ -24,10 +24,6 @@
"name": "kind",
"type": "TAG_TYPE_STRING"
},
- {
- "name": "timestamp",
- "type": "TAG_TYPE_TIMESTAMP"
- },
{
"name": "duration",
"type": "TAG_TYPE_INT"
@@ -63,6 +59,14 @@
{
"name": "debug",
"type": "TAG_TYPE_INT"
+ },
+ {
+ "name": "query",
+ "type": "TAG_TYPE_STRING_ARRAY"
+ },
+ {
+ "name": "timestamp",
+ "type": "TAG_TYPE_TIMESTAMP"
}
],
"trace_id_tag_name": "trace_id",
diff --git a/test/cases/init.go b/test/cases/init.go
index 7f335558..9c4bb645 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -64,7 +64,7 @@ func Initialize(addr string, now time.Time) {
// trace
interval = 500 * time.Millisecond
casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now,
interval)
- casestrace.Write(conn, "zipkin", now, interval)
+ casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now,
interval)
time.Sleep(5 * time.Second)
casestrace.WriteToGroup(conn, "sw", "test-trace-group",
"sw_mixed_traces", now.Add(time.Minute), interval)
}
diff --git a/test/cases/stream/data/input/err_in_arr.yaml
b/test/cases/stream/data/input/err_in_arr.yaml
new file mode 100644
index 00000000..29b1a098
--- /dev/null
+++ b/test/cases/stream/data/input/err_in_arr.yaml
@@ -0,0 +1,32 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+ tagFamilies:
+ - name: "searchable"
+ tags: ["trace_id", "extended_tags"]
+ - name: "data"
+ tags: ["data_binary"]
+criteria:
+ condition:
+ name: "extended_tags"
+ op: "BINARY_OP_IN"
+ value:
+ strArray:
+ value: ["c", "b"]
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index b2376586..97ae14a5 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -89,4 +89,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args
helpers.Args) {
g.Entry("multi-groups: update tag type", helpers.Args{Input:
"multi_group_tag_type", Duration: 1 * time.Hour, IgnoreElementID: true}),
g.Entry("multi-groups: sort duration", helpers.Args{Input:
"multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}),
g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1
* time.Hour, IgnoreElementID: true}),
+ g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 *
time.Hour, WantErr: true}),
)
diff --git a/test/cases/trace/data/input/err_in_arr.yml
b/test/cases/trace/data/input/err_in_arr.yml
new file mode 100644
index 00000000..dc2a33dc
--- /dev/null
+++ b/test/cases/trace/data/input/err_in_arr.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+ index_rule_name: "zipkin-timestamp"
+ sort: "SORT_DESC"
+criteria:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "query"
+ op: "BINARY_OP_IN"
+ value:
+ strArray:
+ value: ["SELECT * FROM users"]
+ right:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "span_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "span_002"
+ right:
+ condition:
+ name: "operation_name"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "/db/query"
diff --git a/test/cases/trace/data/input/having_query_tag.yml
b/test/cases/trace/data/input/having_query_tag.yml
new file mode 100644
index 00000000..59ffde14
--- /dev/null
+++ b/test/cases/trace/data/input/having_query_tag.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+ index_rule_name: "zipkin-timestamp"
+ sort: "SORT_DESC"
+criteria:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "query"
+ op: "BINARY_OP_HAVING"
+ value:
+ strArray:
+ value: ["SELECT * FROM users"]
+ right:
+ le:
+ op: "LOGICAL_OP_AND"
+ left:
+ condition:
+ name: "span_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "span_002"
+ right:
+ condition:
+ name: "operation_name"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "/db/query"
diff --git a/test/cases/trace/data/testdata/zipkin.json
b/test/cases/trace/data/testdata/zipkin.json
index 19a636f3..9b21b412 100644
--- a/test/cases/trace/data/testdata/zipkin.json
+++ b/test/cases/trace/data/testdata/zipkin.json
@@ -70,6 +70,15 @@
"int": {
"value": 0
}
+ },
+ {
+ "str_array": {
+ "value": [
+ "user_id=123",
+ "limit=10",
+ "offset=0"
+ ]
+ }
}
],
"span": "zipkin_trace_001_span_001"
@@ -145,6 +154,14 @@
"int": {
"value": 0
}
+ },
+ {
+ "str_array": {
+ "value": [
+ "SELECT * FROM users",
+ "WHERE id=123"
+ ]
+ }
}
],
"span": "zipkin_trace_001_span_002"
@@ -220,6 +237,15 @@
"int": {
"value": 0
}
+ },
+ {
+ "str_array": {
+ "value": [
+ "order_id=456",
+ "status=pending",
+ "customer_id=789"
+ ]
+ }
}
],
"span": "zipkin_trace_002_span_003"
@@ -295,6 +321,14 @@
"int": {
"value": 0
}
+ },
+ {
+ "str_array": {
+ "value": [
+ "GET order:456",
+ "ttl=3600"
+ ]
+ }
}
],
"span": "zipkin_trace_002_span_004"
@@ -370,6 +404,13 @@
"int": {
"value": 1
}
+ },
+ {
+ "str_array": {
+ "value": [
+ "health_check=true"
+ ]
+ }
}
],
"span": "zipkin_trace_003_span_005"
diff --git a/test/cases/trace/data/want/having_query_tag.yml
b/test/cases/trace/data/want/having_query_tag.yml
new file mode 100644
index 00000000..dd3880cc
--- /dev/null
+++ b/test/cases/trace/data/want/having_query_tag.yml
@@ -0,0 +1,61 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+ - spans:
+ - span: zipkin_trace_001_span_001
+ tags:
+ - key: span_id
+ value:
+ str:
+ value: span_001
+ - key: operation_name
+ value:
+ str:
+ value: /api/users
+ - key: query
+ value:
+ strArray:
+ value:
+ - user_id=123
+ - limit=10
+ - offset=0
+ - key: trace_id
+ value:
+ str:
+ value: zipkin_trace_001
+ - span: zipkin_trace_001_span_002
+ tags:
+ - key: span_id
+ value:
+ str:
+ value: span_002
+ - key: operation_name
+ value:
+ str:
+ value: /db/query
+ - key: query
+ value:
+ strArray:
+ value:
+ - SELECT * FROM users
+ - WHERE id=123
+ - key: trace_id
+ value:
+ str:
+ value: zipkin_trace_001
+
\ No newline at end of file
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index 570b5526..fd767504 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -51,4 +51,6 @@ var _ = g.DescribeTable("Scanning Traces", func(args
helpers.Args) {
g.Entry("filter by endpoint", helpers.Args{Input:
"eq_endpoint_order_duration_asc", Duration: 1 * time.Hour}),
g.Entry("order by timestamp limit 2", helpers.Args{Input:
"order_timestamp_desc_limit", Duration: 1 * time.Hour}),
g.Entry("filter by trace id and service unknown", helpers.Args{Input:
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
+ g.Entry("filter by query", helpers.Args{Input: "having_query_tag",
Duration: 1 * time.Hour}),
+ g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 *
time.Hour, WantErr: true}),
)