This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/condition_in
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3b0c3d4832a555b7d3d029fd4c772d48e50ca537
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 19 10:59:04 2025 +0800

    Add new test cases for Zipkin trace with query tag support
    
    This commit introduces two new YAML files for testing: 
`having_query_tag.yml` in both input and expected output directories. The input 
file defines a trace with specific criteria for querying by tags, while the 
output file contains the expected trace structure, including spans and 
associated tags. These additions enhance the testing capabilities for trace 
queries involving the 'query' tag.
---
 banyand/internal/sidx/introducer.go                |  2 +-
 banyand/internal/sidx/snapshot.go                  | 71 ++--------------------
 banyand/internal/sidx/snapshot_test.go             | 17 ------
 pkg/query/logical/stream/index_filter.go           | 20 +++++-
 pkg/query/logical/stream/stream_plan_tag_filter.go |  2 +-
 pkg/query/logical/tag_filter.go                    | 27 ++++++--
 pkg/query/logical/trace/index_filter.go            | 37 +++++++----
 pkg/query/logical/trace/trace_plan_tag_filter.go   |  4 +-
 .../trace/testdata/index_rule_bindings/zipkin.json |  4 +-
 .../testdata/index_rules/zipkin-duration.json      | 14 -----
 .../trace/testdata/index_rules/zipkin-service.json | 14 -----
 pkg/test/trace/testdata/traces/zipkin.json         | 12 ++--
 test/cases/init.go                                 |  2 +-
 test/cases/stream/data/input/err_in_arr.yaml       | 32 ++++++++++
 test/cases/stream/stream.go                        |  1 +
 test/cases/trace/data/input/err_in_arr.yml         | 50 +++++++++++++++
 test/cases/trace/data/input/having_query_tag.yml   | 50 +++++++++++++++
 test/cases/trace/data/testdata/zipkin.json         | 41 +++++++++++++
 test/cases/trace/data/want/having_query_tag.yml    | 61 +++++++++++++++++++
 test/cases/trace/trace.go                          |  2 +
 20 files changed, 322 insertions(+), 141 deletions(-)

diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index b3801a65..3088b186 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -147,7 +147,7 @@ func (s *sidx) introduceMemPart(nextIntroduction 
*introduction, epoch uint64) {
        if cur != nil {
                defer cur.decRef()
        } else {
-               cur = generateSnapshot()
+               cur = &snapshot{}
        }
 
        next := nextIntroduction.memPart
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index e06fcc41..d6612503 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -27,7 +27,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -46,19 +45,15 @@ type snapshot struct {
 
        // ref is the atomic reference counter for safe concurrent access
        ref int32
-
-       // released tracks if this snapshot has been released
-       released atomic.Bool
 }
 
 // newSnapshot creates a new snapshot with the given parts and epoch.
 // The snapshot starts with a reference count of 1.
 func newSnapshot(parts []*partWrapper, epoch uint64) *snapshot {
-       s := generateSnapshot()
+       s := &snapshot{}
        s.parts = append(s.parts[:0], parts...)
        s.epoch = epoch
        s.ref = 1
-       s.released.Store(false)
 
        // Acquire references to all parts to ensure they remain valid
        for _, pw := range s.parts {
@@ -77,25 +72,7 @@ func newSnapshot(parts []*partWrapper, epoch uint64) 
*snapshot {
 // acquire increments the snapshot reference count.
 // Returns true if successful, false if snapshot has been released.
 func (s *snapshot) acquire() bool {
-       if s.released.Load() {
-               return false
-       }
-
-       for {
-               oldRef := atomic.LoadInt32(&s.ref)
-               if oldRef <= 0 {
-                       return false
-               }
-
-               if atomic.CompareAndSwapInt32(&s.ref, oldRef, oldRef+1) {
-                       // Double-check that snapshot wasn't released during 
acquire
-                       if s.released.Load() {
-                               s.release()
-                               return false
-                       }
-                       return true
-               }
-       }
+       return atomic.AddInt32(&s.ref, 1) > 0
 }
 
 // decRef decrements the snapshot reference count (helper for snapshot 
interface).
@@ -118,11 +95,6 @@ func (s *snapshot) release() {
                        Msg("snapshot reference count went negative")
                return
        }
-
-       // Mark as released first
-       s.released.Store(true)
-       // Return to pool
-       releaseSnapshot(s)
 }
 
 // getParts returns parts that potentially contain data within the specified 
key range.
@@ -190,17 +162,8 @@ func (s *snapshot) refCount() int32 {
        return atomic.LoadInt32(&s.ref)
 }
 
-// isReleased returns true if the snapshot has been released.
-func (s *snapshot) isReleased() bool {
-       return s.released.Load()
-}
-
 // validate checks snapshot consistency and part availability.
 func (s *snapshot) validate() error {
-       if s.released.Load() {
-               return fmt.Errorf("snapshot has been released")
-       }
-
        if atomic.LoadInt32(&s.ref) <= 0 {
                return fmt.Errorf("snapshot has zero or negative reference 
count")
        }
@@ -280,7 +243,6 @@ func (s *snapshot) reset() {
        s.parts = s.parts[:0]
        s.epoch = 0
        s.ref = 0
-       s.released.Store(false)
 }
 
 // String returns a string representation of the snapshot.
@@ -329,34 +291,12 @@ func mustReadSnapshot(fileSystem fs.FileSystem, root 
string, snapshot uint64) []
        return result
 }
 
-// Pool for snapshot reuse.
-var snapshotPool = pool.Register[*snapshot]("sidx-snapshot")
-
-// generateSnapshot gets a snapshot from the pool or creates a new one.
-func generateSnapshot() *snapshot {
-       v := snapshotPool.Get()
-       if v == nil {
-               return &snapshot{}
-       }
-       return v
-}
-
-// releaseSnapshot returns a snapshot to the pool after reset.
-func releaseSnapshot(s *snapshot) {
-       if s == nil {
-               return
-       }
-       s.reset()
-       snapshotPool.Put(s)
-}
-
 // copyAllTo creates a new snapshot with all parts from current snapshot.
 func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
        var result snapshot
        result.parts = make([]*partWrapper, len(s.parts))
        result.epoch = epoch
        result.ref = 1
-       result.released.Store(false)
 
        // Copy all parts and acquire references
        copy(result.parts, s.parts)
@@ -379,8 +319,9 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts 
map[uint64]*partWrapper) *s
                        result.parts = append(result.parts, n)
                        continue
                }
-               s.parts[i].acquire()
-               result.parts = append(result.parts, s.parts[i])
+               if s.parts[i].acquire() {
+                       result.parts = append(result.parts, s.parts[i])
+               }
        }
        return &result
 }
@@ -390,7 +331,6 @@ func (s *snapshot) remove(epoch uint64, toRemove 
map[uint64]struct{}) *snapshot
        var result snapshot
        result.epoch = epoch
        result.ref = 1
-       result.released.Store(false)
 
        // Copy parts except those being removed
        for _, pw := range s.parts {
@@ -399,6 +339,7 @@ func (s *snapshot) remove(epoch uint64, toRemove 
map[uint64]struct{}) *snapshot
                                result.parts = append(result.parts, pw)
                        }
                }
+               pw.markForRemoval()
        }
 
        return &result
diff --git a/banyand/internal/sidx/snapshot_test.go 
b/banyand/internal/sidx/snapshot_test.go
index 9029edaf..3322a459 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -55,10 +55,6 @@ func TestSnapshot_Creation(t *testing.T) {
        if snapshot.getPartCount() != len(parts) {
                t.Errorf("expected part count %d, got %d", len(parts), 
snapshot.getPartCount())
        }
-
-       if snapshot.isReleased() {
-               t.Error("snapshot should not be released")
-       }
 }
 
 func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
@@ -81,18 +77,8 @@ func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
                t.Errorf("expected ref count 1, got %d", snapshot.refCount())
        }
 
-       // Check state before final release
-       isReleasedBefore := snapshot.isReleased()
-       if isReleasedBefore {
-               t.Error("snapshot should not be released before final release")
-       }
-
        // Final release should clean up
        snapshot.release()
-
-       // After final release, the snapshot object may be reset and returned 
to pool
-       // so we can't reliably check its state. The important thing is that it
-       // doesn't crash and the cleanup happens properly.
 }
 
 func TestSnapshot_ReferenceCountingConcurrent(t *testing.T) {
@@ -349,9 +335,6 @@ func TestSnapshot_PoolReuse(t *testing.T) {
        if snapshot2.refCount() != 1 {
                t.Errorf("expected ref count 1, got %d", snapshot2.refCount())
        }
-       if snapshot2.isReleased() {
-               t.Error("new snapshot should not be released")
-       }
 }
 
 // Helper types and functions.
diff --git a/pkg/query/logical/stream/index_filter.go 
b/pkg/query/logical/stream/index_filter.go
index 8cce87ad..89a715e8 100644
--- a/pkg/query/logical/stream/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -50,7 +50,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
                        return nil, parsedEntity, nil
                }
                if ok, indexRule := schema.IndexDefined(cond.Name); ok && 
indexRule.Type == indexRuleType {
-                       return parseConditionToFilter(cond, indexRule, expr, 
entity)
+                       return parseConditionToFilter(cond, indexRule, expr, 
entity, schema)
                }
                return ENode, [][]*modelv1.TagValue{entity}, nil
        case *modelv1.Criteria_Le:
@@ -103,7 +103,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
 }
 
 func parseConditionToFilter(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule,
-       expr logical.LiteralExpr, entity []*modelv1.TagValue,
+       expr logical.LiteralExpr, entity []*modelv1.TagValue, schema 
logical.Schema,
 ) (index.Filter, [][]*modelv1.TagValue, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
@@ -124,6 +124,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
        case modelv1.Condition_BINARY_OP_NE:
                return newNot(indexRule, newEq(indexRule, expr)), 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_HAVING:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec == nil {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v 
for skipping index", cond)
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -135,6 +139,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return and, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec == nil {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v 
for skipping index", cond)
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -146,6 +154,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, 
nil
        case modelv1.Condition_BINARY_OP_IN:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "in condition is not 
supported for array type")
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -157,6 +169,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return or, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "not in condition is not 
supported for array type")
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index 3ab17826..a80e4b6c 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -83,7 +83,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
        ctx.projectionTags = projTags
        plan := uis.selectIndexScanner(ctx, uis.ec)
        if uis.criteria != nil {
-               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, false, "")
+               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, false, "")
                if errFilter != nil {
                        return nil, errFilter
                }
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cd2b2e2f..9480548c 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -25,6 +25,7 @@ import (
        "github.com/blugelabs/bluge/analysis"
        "github.com/pkg/errors"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index/analyzer"
@@ -76,11 +77,13 @@ type TagFilter interface {
 
 // BuildSimpleTagFilter returns a TagFilter without any local-index, global 
index, sharding key support.
 func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
-       return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false, "")
+       return BuildTagFilter(criteria, nil, nil, emptyIndexChecker{}, false, 
"")
 }
 
 // BuildTagFilter returns a TagFilter.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string) 
(TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
schema Schema,
+       indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string,
+) (TagFilter, error) {
        if criteria == nil {
                return DummyFilter, nil
        }
@@ -100,14 +103,14 @@ func BuildTagFilter(criteria *modelv1.Criteria, 
entityDict map[string]int, index
                if cond.Name == globalTagName {
                        return DummyFilter, nil
                }
-               return parseFilter(cond, expr, indexChecker)
+               return parseFilter(cond, expr, schema, indexChecker)
        case *modelv1.Criteria_Le:
                le := criteria.GetLe()
-               left, err := BuildTagFilter(le.Left, entityDict, indexChecker, 
hasGlobalIndex, globalTagName)
+               left, err := BuildTagFilter(le.Left, entityDict, schema, 
indexChecker, hasGlobalIndex, globalTagName)
                if err != nil {
                        return nil, err
                }
-               right, err := BuildTagFilter(le.Right, entityDict, 
indexChecker, hasGlobalIndex, globalTagName)
+               right, err := BuildTagFilter(le.Right, entityDict, schema, 
indexChecker, hasGlobalIndex, globalTagName)
                if err != nil {
                        return nil, err
                }
@@ -131,7 +134,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, index
        return nil, ErrInvalidCriteriaType
 }
 
-func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker 
IndexChecker) (TagFilter, error) {
+func parseFilter(cond *modelv1.Condition, expr ComparableExpr, schema Schema, 
indexChecker IndexChecker) (TagFilter, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return newRangeTag(cond.Name, rangeOpts{
@@ -162,8 +165,20 @@ func parseFilter(cond *modelv1.Condition, expr 
ComparableExpr, indexChecker Inde
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                return newNotTag(newHavingTag(cond.Name, expr)), nil
        case modelv1.Condition_BINARY_OP_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, 
errors.WithMessagef(ErrUnsupportedConditionOp, "in condition is not supported 
for array type")
+                       }
+               }
                return newInTag(cond.Name, expr), nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, 
errors.WithMessagef(ErrUnsupportedConditionOp, "not in condition is not 
supported for array type")
+                       }
+               }
                return newNotTag(newInTag(cond.Name, expr)), nil
        default:
                return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag 
filter parses %v", cond)
diff --git a/pkg/query/logical/trace/index_filter.go 
b/pkg/query/logical/trace/index_filter.go
index 44fad4d3..6faa950d 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -23,6 +23,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -34,7 +35,7 @@ import (
 // Trace conditions are either entity or skipping index.
 // Trace creates explicit index rules for skipping index on all tags that 
don't belong to entity.
 // Returns min/max int64 values for the orderByTag if provided, otherwise 
returns math.MaxInt64, math.MinInt64.
-func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool,
+func buildFilter(criteria *modelv1.Criteria, schema logical.Schema, tagNames 
map[string]bool,
        entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName 
string, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        if criteria == nil {
@@ -43,15 +44,17 @@ func buildFilter(criteria *modelv1.Criteria, tagNames 
map[string]bool,
 
        switch criteria.GetExp().(type) {
        case *modelv1.Criteria_Condition:
-               return buildFilterFromCondition(criteria.GetCondition(), 
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+               return buildFilterFromCondition(criteria.GetCondition(), 
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
        case *modelv1.Criteria_Le:
-               return buildFilterFromLogicalExpression(criteria.GetLe(), 
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+               return buildFilterFromLogicalExpression(criteria.GetLe(), 
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
        }
 
        return nil, nil, nil, nil, math.MinInt64, math.MaxInt64, 
logical.ErrInvalidCriteriaType
 }
 
-func parseConditionToFilter(cond *modelv1.Condition, entity 
[]*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter, 
[][]*modelv1.TagValue, error) {
+func parseConditionToFilter(cond *modelv1.Condition, schema logical.Schema,
+       entity []*modelv1.TagValue, expr logical.LiteralExpr,
+) (index.Filter, [][]*modelv1.TagValue, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return &traceRangeFilter{op: "gt", tagName: cond.Name, cond: 
cond, expr: expr}, [][]*modelv1.TagValue{entity}, nil
@@ -72,8 +75,20 @@ func parseConditionToFilter(cond *modelv1.Condition, entity 
[]*modelv1.TagValue,
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                return &traceFilter{op: "not_having", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, nil, errors.Errorf("in condition is 
not supported for array type")
+                       }
+               }
                return &traceFilter{op: "in", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, nil, errors.Errorf("not in 
condition is not supported for array type")
+                       }
+               }
                return &traceFilter{op: "not_in", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        }
        return nil, nil, errors.Errorf("unsupported condition operation: %v", 
cond.Op)
@@ -276,7 +291,7 @@ func extractTraceIDsFromCondition(cond *modelv1.Condition) 
[]string {
 }
 
 // buildFilterFromCondition handles single condition filtering and min/max 
extraction.
-func buildFilterFromCondition(cond *modelv1.Condition, tagNames 
map[string]bool, entityDict map[string]int,
+func buildFilterFromCondition(cond *modelv1.Condition, schema logical.Schema, 
tagNames map[string]bool, entityDict map[string]int,
        entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        var collectedTagNames []string
@@ -317,12 +332,12 @@ func buildFilterFromCondition(cond *modelv1.Condition, 
tagNames map[string]bool,
        if err != nil {
                return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, 
err
        }
-       filter, entities, err := parseConditionToFilter(cond, entity, expr)
+       filter, entities, err := parseConditionToFilter(cond, schema, entity, 
expr)
        return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, 
err
 }
 
 // buildFilterFromLogicalExpression handles logical expression (AND/OR) 
filtering and min/max extraction.
-func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames 
map[string]bool, entityDict map[string]int,
+func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, schema 
logical.Schema, tagNames map[string]bool, entityDict map[string]int,
        entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        var collectedTagNames []string
@@ -334,17 +349,17 @@ func buildFilterFromLogicalExpression(le 
*modelv1.LogicalExpression, tagNames ma
                return nil, nil, nil, traceIDs, minVal, maxVal, 
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and 
right) of [%v] are empty", le)
        }
        if le.GetLeft() == nil {
-               return buildFilter(le.Right, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
+               return buildFilter(le.Right, schema, tagNames, entityDict, 
entity, traceIDTagName, orderByTag)
        }
        if le.GetRight() == nil {
-               return buildFilter(le.Left, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
+               return buildFilter(le.Left, schema, tagNames, entityDict, 
entity, traceIDTagName, orderByTag)
        }
 
-       left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err 
:= buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
+       left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err 
:= buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
        if err != nil {
                return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err
        }
-       right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, 
err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
+       right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, 
err := buildFilter(le.Right, schema, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
        if err != nil {
                return nil, nil, append(leftTagNames, rightTagNames...), 
append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err
        }
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 31f2cd4d..270d795d 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -115,7 +115,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s 
logical.Schema) (logical.Plan, er
        }
        plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
        if uis.criteria != nil {
-               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, len(traceIDs) > 0, uis.traceIDTagName)
+               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName)
                if errFilter != nil {
                        return nil, errFilter
                }
@@ -189,7 +189,7 @@ func buildTraceFilter(criteria *modelv1.Criteria, s 
logical.Schema, entityDict m
                tagNames[tagName] = true
        }
 
-       filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := 
buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName, orderByTag)
+       filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := 
buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
        return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, 
err
 }
 
diff --git a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json 
b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
index f1e994d7..cf03fabb 100644
--- a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
+++ b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
@@ -4,9 +4,7 @@
         "group": "zipkinTrace"
     },
     "rules": [
-        "zipkin-duration",
-        "zipkin-timestamp",
-        "zipkin-service"
+        "zipkin-timestamp"
     ],
     "subject": {
         "catalog": "CATALOG_TRACE",
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-duration.json 
b/pkg/test/trace/testdata/index_rules/zipkin-duration.json
deleted file mode 100644
index f8c5ad76..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-duration.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-    "metadata": {
-        "name": "zipkin-duration",
-        "group": "zipkinTrace"
-    },
-    "tags": [
-        "local_endpoint_service_name",
-        "operation_name",
-        "kind",
-        "duration"
-    ],
-    "type": "TYPE_TREE",
-    "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-service.json 
b/pkg/test/trace/testdata/index_rules/zipkin-service.json
deleted file mode 100644
index 486d1512..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-service.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-    "metadata": {
-        "name": "zipkin-service",
-        "group": "zipkinTrace"
-    },
-    "tags": [
-        "local_endpoint_service_name",
-        "remote_endpoint_service_name",
-        "operation_name",
-        "kind"
-    ],
-    "type": "TYPE_TREE",
-    "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/zipkin.json 
b/pkg/test/trace/testdata/traces/zipkin.json
index 7dd6d5a6..d90ee6af 100644
--- a/pkg/test/trace/testdata/traces/zipkin.json
+++ b/pkg/test/trace/testdata/traces/zipkin.json
@@ -24,10 +24,6 @@
             "name": "kind",
             "type": "TAG_TYPE_STRING"
         },
-        {
-            "name": "timestamp",
-            "type": "TAG_TYPE_TIMESTAMP"
-        },
         {
             "name": "duration",
             "type": "TAG_TYPE_INT"
@@ -63,6 +59,14 @@
         {
             "name": "debug",
             "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "query",
+            "type": "TAG_TYPE_STRING_ARRAY"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
         }
     ],
     "trace_id_tag_name": "trace_id",
diff --git a/test/cases/init.go b/test/cases/init.go
index 7f335558..9c4bb645 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -64,7 +64,7 @@ func Initialize(addr string, now time.Time) {
        // trace
        interval = 500 * time.Millisecond
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now, 
interval)
-       casestrace.Write(conn, "zipkin", now, interval)
+       casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now, 
interval)
        time.Sleep(5 * time.Second)
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", 
"sw_mixed_traces", now.Add(time.Minute), interval)
 }
diff --git a/test/cases/stream/data/input/err_in_arr.yaml 
b/test/cases/stream/data/input/err_in_arr.yaml
new file mode 100644
index 00000000..29b1a098
--- /dev/null
+++ b/test/cases/stream/data/input/err_in_arr.yaml
@@ -0,0 +1,32 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+  tagFamilies:
+  - name: "searchable"
+    tags: ["trace_id", "extended_tags"]
+  - name: "data"
+    tags: ["data_binary"]
+criteria:
+  condition:
+    name: "extended_tags"
+    op: "BINARY_OP_IN"
+    value:
+      strArray:
+        value: ["c", "b"]
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index b2376586..97ae14a5 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -89,4 +89,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args 
helpers.Args) {
        g.Entry("multi-groups: update tag type", helpers.Args{Input: 
"multi_group_tag_type", Duration: 1 * time.Hour, IgnoreElementID: true}),
        g.Entry("multi-groups: sort duration", helpers.Args{Input: 
"multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}),
        g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1 
* time.Hour, IgnoreElementID: true}),
+       g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
 )
diff --git a/test/cases/trace/data/input/err_in_arr.yml 
b/test/cases/trace/data/input/err_in_arr.yml
new file mode 100644
index 00000000..dc2a33dc
--- /dev/null
+++ b/test/cases/trace/data/input/err_in_arr.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+  index_rule_name: "zipkin-timestamp"
+  sort: "SORT_DESC"
+criteria:
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "query"
+        op: "BINARY_OP_IN"
+        value:
+          strArray:
+            value: ["SELECT * FROM users"]
+    right:
+      le:
+        op: "LOGICAL_OP_AND"
+        left:
+          condition:
+            name: "span_id"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "span_002"
+        right:
+          condition:
+            name: "operation_name"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "/db/query"
diff --git a/test/cases/trace/data/input/having_query_tag.yml 
b/test/cases/trace/data/input/having_query_tag.yml
new file mode 100644
index 00000000..59ffde14
--- /dev/null
+++ b/test/cases/trace/data/input/having_query_tag.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+  index_rule_name: "zipkin-timestamp"
+  sort: "SORT_DESC"
+criteria:
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "query"
+        op: "BINARY_OP_HAVING"
+        value:
+          strArray:
+            value: ["SELECT * FROM users"]
+    right:
+      le:
+        op: "LOGICAL_OP_AND"
+        left:
+          condition:
+            name: "span_id"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "span_002"
+        right:
+          condition:
+            name: "operation_name"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "/db/query"
diff --git a/test/cases/trace/data/testdata/zipkin.json 
b/test/cases/trace/data/testdata/zipkin.json
index 19a636f3..9b21b412 100644
--- a/test/cases/trace/data/testdata/zipkin.json
+++ b/test/cases/trace/data/testdata/zipkin.json
@@ -70,6 +70,15 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "user_id=123",
+                        "limit=10",
+                        "offset=0"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_001_span_001"
@@ -145,6 +154,14 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "SELECT * FROM users",
+                        "WHERE id=123"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_001_span_002"
@@ -220,6 +237,15 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "order_id=456",
+                        "status=pending",
+                        "customer_id=789"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_002_span_003"
@@ -295,6 +321,14 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "GET order:456",
+                        "ttl=3600"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_002_span_004"
@@ -370,6 +404,13 @@
                 "int": {
                     "value": 1
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "health_check=true"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_003_span_005"
diff --git a/test/cases/trace/data/want/having_query_tag.yml 
b/test/cases/trace/data/want/having_query_tag.yml
new file mode 100644
index 00000000..dd3880cc
--- /dev/null
+++ b/test/cases/trace/data/want/having_query_tag.yml
@@ -0,0 +1,61 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+  - spans:
+    - span: zipkin_trace_001_span_001
+      tags:
+      - key: span_id
+        value:
+          str:
+            value: span_001
+      - key: operation_name
+        value:
+          str:
+            value: /api/users
+      - key: query
+        value:
+          strArray:
+            value:
+            - user_id=123
+            - limit=10
+            - offset=0
+      - key: trace_id
+        value:
+          str:
+            value: zipkin_trace_001
+    - span: zipkin_trace_001_span_002
+      tags:
+      - key: span_id
+        value:
+          str:
+            value: span_002
+      - key: operation_name
+        value:
+          str:
+            value: /db/query
+      - key: query
+        value:
+          strArray:
+            value:
+            - SELECT * FROM users
+            - WHERE id=123
+      - key: trace_id
+        value:
+          str:
+            value: zipkin_trace_001
+            
\ No newline at end of file
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index 570b5526..fd767504 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -51,4 +51,6 @@ var _ = g.DescribeTable("Scanning Traces", func(args 
helpers.Args) {
        g.Entry("filter by endpoint", helpers.Args{Input: 
"eq_endpoint_order_duration_asc", Duration: 1 * time.Hour}),
        g.Entry("order by timestamp limit 2", helpers.Args{Input: 
"order_timestamp_desc_limit", Duration: 1 * time.Hour}),
        g.Entry("filter by trace id and service unknown", helpers.Args{Input: 
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
+       g.Entry("filter by query", helpers.Args{Input: "having_query_tag", 
Duration: 1 * time.Hour}),
+       g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
 )


Reply via email to