This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a34e4615 Fix returning empty result when using IN operator on array 
type tags (#775)
a34e4615 is described below

commit a34e4615eddb0dd614152177391f5d2db5522d3c
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Sep 19 13:38:51 2025 +0800

    Fix returning empty result when using IN operator on array type tags (#775)
---
 CHANGES.md                                         |  1 +
 banyand/internal/sidx/introducer.go                |  2 +-
 banyand/internal/sidx/snapshot.go                  | 72 +++-------------------
 banyand/internal/sidx/snapshot_test.go             | 17 -----
 pkg/query/logical/stream/index_filter.go           | 20 +++++-
 pkg/query/logical/stream/stream_plan_tag_filter.go |  2 +-
 pkg/query/logical/tag_filter.go                    | 27 ++++++--
 pkg/query/logical/trace/index_filter.go            | 37 +++++++----
 pkg/query/logical/trace/trace_plan_tag_filter.go   |  4 +-
 .../trace/testdata/index_rule_bindings/zipkin.json |  4 +-
 .../testdata/index_rules/zipkin-duration.json      | 14 -----
 .../trace/testdata/index_rules/zipkin-service.json | 14 -----
 pkg/test/trace/testdata/traces/zipkin.json         | 12 ++--
 test/cases/init.go                                 |  2 +-
 test/cases/stream/data/input/err_in_arr.yaml       | 32 ++++++++++
 test/cases/stream/stream.go                        |  1 +
 test/cases/trace/data/input/err_in_arr.yml         | 50 +++++++++++++++
 test/cases/trace/data/input/having_query_tag.yml   | 50 +++++++++++++++
 test/cases/trace/data/testdata/zipkin.json         | 41 ++++++++++++
 test/cases/trace/data/want/having_query_tag.yml    | 61 ++++++++++++++++++
 test/cases/trace/trace.go                          |  2 +
 21 files changed, 324 insertions(+), 141 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 63ebb7f6..d788fc11 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,6 +56,7 @@ Release Notes.
 - Fix the crash when collecting the metrics from a closed segment.
 - Fix topN parsing panic when the criteria is set.
 - Remove the indexed_only field in TagSpec.
+- Fix returning empty result when using IN operatior on the array type tags.
 
 ### Document
 
diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index b3801a65..3088b186 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -147,7 +147,7 @@ func (s *sidx) introduceMemPart(nextIntroduction 
*introduction, epoch uint64) {
        if cur != nil {
                defer cur.decRef()
        } else {
-               cur = generateSnapshot()
+               cur = &snapshot{}
        }
 
        next := nextIntroduction.memPart
diff --git a/banyand/internal/sidx/snapshot.go 
b/banyand/internal/sidx/snapshot.go
index e06fcc41..ce7cf6a6 100644
--- a/banyand/internal/sidx/snapshot.go
+++ b/banyand/internal/sidx/snapshot.go
@@ -27,7 +27,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 const (
@@ -46,19 +45,15 @@ type snapshot struct {
 
        // ref is the atomic reference counter for safe concurrent access
        ref int32
-
-       // released tracks if this snapshot has been released
-       released atomic.Bool
 }
 
 // newSnapshot creates a new snapshot with the given parts and epoch.
 // The snapshot starts with a reference count of 1.
 func newSnapshot(parts []*partWrapper, epoch uint64) *snapshot {
-       s := generateSnapshot()
+       s := &snapshot{}
        s.parts = append(s.parts[:0], parts...)
        s.epoch = epoch
        s.ref = 1
-       s.released.Store(false)
 
        // Acquire references to all parts to ensure they remain valid
        for _, pw := range s.parts {
@@ -77,25 +72,7 @@ func newSnapshot(parts []*partWrapper, epoch uint64) 
*snapshot {
 // acquire increments the snapshot reference count.
 // Returns true if successful, false if snapshot has been released.
 func (s *snapshot) acquire() bool {
-       if s.released.Load() {
-               return false
-       }
-
-       for {
-               oldRef := atomic.LoadInt32(&s.ref)
-               if oldRef <= 0 {
-                       return false
-               }
-
-               if atomic.CompareAndSwapInt32(&s.ref, oldRef, oldRef+1) {
-                       // Double-check that snapshot wasn't released during 
acquire
-                       if s.released.Load() {
-                               s.release()
-                               return false
-                       }
-                       return true
-               }
-       }
+       return atomic.AddInt32(&s.ref, 1) > 0
 }
 
 // decRef decrements the snapshot reference count (helper for snapshot 
interface).
@@ -118,11 +95,7 @@ func (s *snapshot) release() {
                        Msg("snapshot reference count went negative")
                return
        }
-
-       // Mark as released first
-       s.released.Store(true)
-       // Return to pool
-       releaseSnapshot(s)
+       s.reset()
 }
 
 // getParts returns parts that potentially contain data within the specified 
key range.
@@ -190,17 +163,8 @@ func (s *snapshot) refCount() int32 {
        return atomic.LoadInt32(&s.ref)
 }
 
-// isReleased returns true if the snapshot has been released.
-func (s *snapshot) isReleased() bool {
-       return s.released.Load()
-}
-
 // validate checks snapshot consistency and part availability.
 func (s *snapshot) validate() error {
-       if s.released.Load() {
-               return fmt.Errorf("snapshot has been released")
-       }
-
        if atomic.LoadInt32(&s.ref) <= 0 {
                return fmt.Errorf("snapshot has zero or negative reference 
count")
        }
@@ -280,7 +244,6 @@ func (s *snapshot) reset() {
        s.parts = s.parts[:0]
        s.epoch = 0
        s.ref = 0
-       s.released.Store(false)
 }
 
 // String returns a string representation of the snapshot.
@@ -329,34 +292,12 @@ func mustReadSnapshot(fileSystem fs.FileSystem, root 
string, snapshot uint64) []
        return result
 }
 
-// Pool for snapshot reuse.
-var snapshotPool = pool.Register[*snapshot]("sidx-snapshot")
-
-// generateSnapshot gets a snapshot from the pool or creates a new one.
-func generateSnapshot() *snapshot {
-       v := snapshotPool.Get()
-       if v == nil {
-               return &snapshot{}
-       }
-       return v
-}
-
-// releaseSnapshot returns a snapshot to the pool after reset.
-func releaseSnapshot(s *snapshot) {
-       if s == nil {
-               return
-       }
-       s.reset()
-       snapshotPool.Put(s)
-}
-
 // copyAllTo creates a new snapshot with all parts from current snapshot.
 func (s *snapshot) copyAllTo(epoch uint64) *snapshot {
        var result snapshot
        result.parts = make([]*partWrapper, len(s.parts))
        result.epoch = epoch
        result.ref = 1
-       result.released.Store(false)
 
        // Copy all parts and acquire references
        copy(result.parts, s.parts)
@@ -379,8 +320,9 @@ func (s *snapshot) merge(nextEpoch uint64, nextParts 
map[uint64]*partWrapper) *s
                        result.parts = append(result.parts, n)
                        continue
                }
-               s.parts[i].acquire()
-               result.parts = append(result.parts, s.parts[i])
+               if s.parts[i].acquire() {
+                       result.parts = append(result.parts, s.parts[i])
+               }
        }
        return &result
 }
@@ -390,7 +332,6 @@ func (s *snapshot) remove(epoch uint64, toRemove 
map[uint64]struct{}) *snapshot
        var result snapshot
        result.epoch = epoch
        result.ref = 1
-       result.released.Store(false)
 
        // Copy parts except those being removed
        for _, pw := range s.parts {
@@ -399,6 +340,7 @@ func (s *snapshot) remove(epoch uint64, toRemove 
map[uint64]struct{}) *snapshot
                                result.parts = append(result.parts, pw)
                        }
                }
+               pw.markForRemoval()
        }
 
        return &result
diff --git a/banyand/internal/sidx/snapshot_test.go 
b/banyand/internal/sidx/snapshot_test.go
index 9029edaf..3322a459 100644
--- a/banyand/internal/sidx/snapshot_test.go
+++ b/banyand/internal/sidx/snapshot_test.go
@@ -55,10 +55,6 @@ func TestSnapshot_Creation(t *testing.T) {
        if snapshot.getPartCount() != len(parts) {
                t.Errorf("expected part count %d, got %d", len(parts), 
snapshot.getPartCount())
        }
-
-       if snapshot.isReleased() {
-               t.Error("snapshot should not be released")
-       }
 }
 
 func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
@@ -81,18 +77,8 @@ func TestSnapshot_ReferenceCountingBasic(t *testing.T) {
                t.Errorf("expected ref count 1, got %d", snapshot.refCount())
        }
 
-       // Check state before final release
-       isReleasedBefore := snapshot.isReleased()
-       if isReleasedBefore {
-               t.Error("snapshot should not be released before final release")
-       }
-
        // Final release should clean up
        snapshot.release()
-
-       // After final release, the snapshot object may be reset and returned 
to pool
-       // so we can't reliably check its state. The important thing is that it
-       // doesn't crash and the cleanup happens properly.
 }
 
 func TestSnapshot_ReferenceCountingConcurrent(t *testing.T) {
@@ -349,9 +335,6 @@ func TestSnapshot_PoolReuse(t *testing.T) {
        if snapshot2.refCount() != 1 {
                t.Errorf("expected ref count 1, got %d", snapshot2.refCount())
        }
-       if snapshot2.isReleased() {
-               t.Error("new snapshot should not be released")
-       }
 }
 
 // Helper types and functions.
diff --git a/pkg/query/logical/stream/index_filter.go 
b/pkg/query/logical/stream/index_filter.go
index 8cce87ad..89a715e8 100644
--- a/pkg/query/logical/stream/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -50,7 +50,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
                        return nil, parsedEntity, nil
                }
                if ok, indexRule := schema.IndexDefined(cond.Name); ok && 
indexRule.Type == indexRuleType {
-                       return parseConditionToFilter(cond, indexRule, expr, 
entity)
+                       return parseConditionToFilter(cond, indexRule, expr, 
entity, schema)
                }
                return ENode, [][]*modelv1.TagValue{entity}, nil
        case *modelv1.Criteria_Le:
@@ -103,7 +103,7 @@ func buildLocalFilter(criteria *modelv1.Criteria, schema 
logical.Schema,
 }
 
 func parseConditionToFilter(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule,
-       expr logical.LiteralExpr, entity []*modelv1.TagValue,
+       expr logical.LiteralExpr, entity []*modelv1.TagValue, schema 
logical.Schema,
 ) (index.Filter, [][]*modelv1.TagValue, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
@@ -124,6 +124,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
        case modelv1.Condition_BINARY_OP_NE:
                return newNot(indexRule, newEq(indexRule, expr)), 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_HAVING:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec == nil {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v 
for skipping index", cond)
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -135,6 +139,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return and, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec == nil {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v 
for skipping index", cond)
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -146,6 +154,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, 
nil
        case modelv1.Condition_BINARY_OP_IN:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "in condition is not 
supported for array type")
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
@@ -157,6 +169,10 @@ func parseConditionToFilter(cond *modelv1.Condition, 
indexRule *databasev1.Index
                }
                return or, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               tagSpec := schema.FindTagSpecByName(cond.Name)
+               if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                       return nil, nil, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "not in condition is not 
supported for array type")
+               }
                ee := expr.SubExprs()
                l := len(ee)
                if l < 1 {
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index 3ab17826..a80e4b6c 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -83,7 +83,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
        ctx.projectionTags = projTags
        plan := uis.selectIndexScanner(ctx, uis.ec)
        if uis.criteria != nil {
-               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, false, "")
+               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, false, "")
                if errFilter != nil {
                        return nil, errFilter
                }
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cd2b2e2f..9480548c 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -25,6 +25,7 @@ import (
        "github.com/blugelabs/bluge/analysis"
        "github.com/pkg/errors"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index/analyzer"
@@ -76,11 +77,13 @@ type TagFilter interface {
 
 // BuildSimpleTagFilter returns a TagFilter without any local-index, global 
index, sharding key support.
 func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
-       return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false, "")
+       return BuildTagFilter(criteria, nil, nil, emptyIndexChecker{}, false, 
"")
 }
 
 // BuildTagFilter returns a TagFilter.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string) 
(TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, 
schema Schema,
+       indexChecker IndexChecker, hasGlobalIndex bool, globalTagName string,
+) (TagFilter, error) {
        if criteria == nil {
                return DummyFilter, nil
        }
@@ -100,14 +103,14 @@ func BuildTagFilter(criteria *modelv1.Criteria, 
entityDict map[string]int, index
                if cond.Name == globalTagName {
                        return DummyFilter, nil
                }
-               return parseFilter(cond, expr, indexChecker)
+               return parseFilter(cond, expr, schema, indexChecker)
        case *modelv1.Criteria_Le:
                le := criteria.GetLe()
-               left, err := BuildTagFilter(le.Left, entityDict, indexChecker, 
hasGlobalIndex, globalTagName)
+               left, err := BuildTagFilter(le.Left, entityDict, schema, 
indexChecker, hasGlobalIndex, globalTagName)
                if err != nil {
                        return nil, err
                }
-               right, err := BuildTagFilter(le.Right, entityDict, 
indexChecker, hasGlobalIndex, globalTagName)
+               right, err := BuildTagFilter(le.Right, entityDict, schema, 
indexChecker, hasGlobalIndex, globalTagName)
                if err != nil {
                        return nil, err
                }
@@ -131,7 +134,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, index
        return nil, ErrInvalidCriteriaType
 }
 
-func parseFilter(cond *modelv1.Condition, expr ComparableExpr, indexChecker 
IndexChecker) (TagFilter, error) {
+func parseFilter(cond *modelv1.Condition, expr ComparableExpr, schema Schema, 
indexChecker IndexChecker) (TagFilter, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return newRangeTag(cond.Name, rangeOpts{
@@ -162,8 +165,20 @@ func parseFilter(cond *modelv1.Condition, expr 
ComparableExpr, indexChecker Inde
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                return newNotTag(newHavingTag(cond.Name, expr)), nil
        case modelv1.Condition_BINARY_OP_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, 
errors.WithMessagef(ErrUnsupportedConditionOp, "in condition is not supported 
for array type")
+                       }
+               }
                return newInTag(cond.Name, expr), nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, 
errors.WithMessagef(ErrUnsupportedConditionOp, "not in condition is not 
supported for array type")
+                       }
+               }
                return newNotTag(newInTag(cond.Name, expr)), nil
        default:
                return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag 
filter parses %v", cond)
diff --git a/pkg/query/logical/trace/index_filter.go 
b/pkg/query/logical/trace/index_filter.go
index 44fad4d3..6faa950d 100644
--- a/pkg/query/logical/trace/index_filter.go
+++ b/pkg/query/logical/trace/index_filter.go
@@ -23,6 +23,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -34,7 +35,7 @@ import (
 // Trace conditions are either entity or skipping index.
 // Trace creates explicit index rules for skipping index on all tags that 
don't belong to entity.
 // Returns min/max int64 values for the orderByTag if provided, otherwise 
returns math.MaxInt64, math.MinInt64.
-func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool,
+func buildFilter(criteria *modelv1.Criteria, schema logical.Schema, tagNames 
map[string]bool,
        entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName 
string, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        if criteria == nil {
@@ -43,15 +44,17 @@ func buildFilter(criteria *modelv1.Criteria, tagNames 
map[string]bool,
 
        switch criteria.GetExp().(type) {
        case *modelv1.Criteria_Condition:
-               return buildFilterFromCondition(criteria.GetCondition(), 
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+               return buildFilterFromCondition(criteria.GetCondition(), 
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
        case *modelv1.Criteria_Le:
-               return buildFilterFromLogicalExpression(criteria.GetLe(), 
tagNames, entityDict, entity, traceIDTagName, orderByTag)
+               return buildFilterFromLogicalExpression(criteria.GetLe(), 
schema, tagNames, entityDict, entity, traceIDTagName, orderByTag)
        }
 
        return nil, nil, nil, nil, math.MinInt64, math.MaxInt64, 
logical.ErrInvalidCriteriaType
 }
 
-func parseConditionToFilter(cond *modelv1.Condition, entity 
[]*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter, 
[][]*modelv1.TagValue, error) {
+func parseConditionToFilter(cond *modelv1.Condition, schema logical.Schema,
+       entity []*modelv1.TagValue, expr logical.LiteralExpr,
+) (index.Filter, [][]*modelv1.TagValue, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return &traceRangeFilter{op: "gt", tagName: cond.Name, cond: 
cond, expr: expr}, [][]*modelv1.TagValue{entity}, nil
@@ -72,8 +75,20 @@ func parseConditionToFilter(cond *modelv1.Condition, entity 
[]*modelv1.TagValue,
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
                return &traceFilter{op: "not_having", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, nil, errors.Errorf("in condition is 
not supported for array type")
+                       }
+               }
                return &traceFilter{op: "in", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
+               if schema != nil {
+                       tagSpec := schema.FindTagSpecByName(cond.Name)
+                       if tagSpec != nil && (tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_STRING_ARRAY || tagSpec.Spec.GetType() == 
databasev1.TagType_TAG_TYPE_INT_ARRAY) {
+                               return nil, nil, errors.Errorf("not in 
condition is not supported for array type")
+                       }
+               }
                return &traceFilter{op: "not_in", tagName: cond.Name}, 
[][]*modelv1.TagValue{entity}, nil
        }
        return nil, nil, errors.Errorf("unsupported condition operation: %v", 
cond.Op)
@@ -276,7 +291,7 @@ func extractTraceIDsFromCondition(cond *modelv1.Condition) 
[]string {
 }
 
 // buildFilterFromCondition handles single condition filtering and min/max 
extraction.
-func buildFilterFromCondition(cond *modelv1.Condition, tagNames 
map[string]bool, entityDict map[string]int,
+func buildFilterFromCondition(cond *modelv1.Condition, schema logical.Schema, 
tagNames map[string]bool, entityDict map[string]int,
        entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        var collectedTagNames []string
@@ -317,12 +332,12 @@ func buildFilterFromCondition(cond *modelv1.Condition, 
tagNames map[string]bool,
        if err != nil {
                return nil, nil, collectedTagNames, traceIDs, minVal, maxVal, 
err
        }
-       filter, entities, err := parseConditionToFilter(cond, entity, expr)
+       filter, entities, err := parseConditionToFilter(cond, schema, entity, 
expr)
        return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, 
err
 }
 
 // buildFilterFromLogicalExpression handles logical expression (AND/OR) 
filtering and min/max extraction.
-func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, tagNames 
map[string]bool, entityDict map[string]int,
+func buildFilterFromLogicalExpression(le *modelv1.LogicalExpression, schema 
logical.Schema, tagNames map[string]bool, entityDict map[string]int,
        entity []*modelv1.TagValue, traceIDTagName, orderByTag string,
 ) (index.Filter, [][]*modelv1.TagValue, []string, []string, int64, int64, 
error) {
        var collectedTagNames []string
@@ -334,17 +349,17 @@ func buildFilterFromLogicalExpression(le 
*modelv1.LogicalExpression, tagNames ma
                return nil, nil, nil, traceIDs, minVal, maxVal, 
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and 
right) of [%v] are empty", le)
        }
        if le.GetLeft() == nil {
-               return buildFilter(le.Right, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
+               return buildFilter(le.Right, schema, tagNames, entityDict, 
entity, traceIDTagName, orderByTag)
        }
        if le.GetRight() == nil {
-               return buildFilter(le.Left, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
+               return buildFilter(le.Left, schema, tagNames, entityDict, 
entity, traceIDTagName, orderByTag)
        }
 
-       left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err 
:= buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
+       left, leftEntities, leftTagNames, leftTraceIDs, leftMin, leftMax, err 
:= buildFilter(le.Left, schema, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
        if err != nil {
                return nil, nil, leftTagNames, leftTraceIDs, minVal, maxVal, err
        }
-       right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, 
err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
+       right, rightEntities, rightTagNames, rightTraceIDs, rightMin, rightMax, 
err := buildFilter(le.Right, schema, tagNames, entityDict, entity, 
traceIDTagName, orderByTag)
        if err != nil {
                return nil, nil, append(leftTagNames, rightTagNames...), 
append(leftTraceIDs, rightTraceIDs...), minVal, maxVal, err
        }
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 31f2cd4d..270d795d 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -115,7 +115,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s 
logical.Schema) (logical.Plan, er
        }
        plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs, minVal, maxVal)
        if uis.criteria != nil {
-               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, len(traceIDs) > 0, uis.traceIDTagName)
+               tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, 
entityDict, s, s, len(traceIDs) > 0, uis.traceIDTagName)
                if errFilter != nil {
                        return nil, errFilter
                }
@@ -189,7 +189,7 @@ func buildTraceFilter(criteria *modelv1.Criteria, s 
logical.Schema, entityDict m
                tagNames[tagName] = true
        }
 
-       filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := 
buildFilter(criteria, tagNames, entityDict, entity, traceIDTagName, orderByTag)
+       filter, entities, collectedTagNames, traceIDs, minVal, maxVal, err := 
buildFilter(criteria, s, tagNames, entityDict, entity, traceIDTagName, 
orderByTag)
        return filter, entities, collectedTagNames, traceIDs, minVal, maxVal, 
err
 }
 
diff --git a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json 
b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
index f1e994d7..cf03fabb 100644
--- a/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
+++ b/pkg/test/trace/testdata/index_rule_bindings/zipkin.json
@@ -4,9 +4,7 @@
         "group": "zipkinTrace"
     },
     "rules": [
-        "zipkin-duration",
-        "zipkin-timestamp",
-        "zipkin-service"
+        "zipkin-timestamp"
     ],
     "subject": {
         "catalog": "CATALOG_TRACE",
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-duration.json 
b/pkg/test/trace/testdata/index_rules/zipkin-duration.json
deleted file mode 100644
index f8c5ad76..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-duration.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-    "metadata": {
-        "name": "zipkin-duration",
-        "group": "zipkinTrace"
-    },
-    "tags": [
-        "local_endpoint_service_name",
-        "operation_name",
-        "kind",
-        "duration"
-    ],
-    "type": "TYPE_TREE",
-    "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/index_rules/zipkin-service.json 
b/pkg/test/trace/testdata/index_rules/zipkin-service.json
deleted file mode 100644
index 486d1512..00000000
--- a/pkg/test/trace/testdata/index_rules/zipkin-service.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-    "metadata": {
-        "name": "zipkin-service",
-        "group": "zipkinTrace"
-    },
-    "tags": [
-        "local_endpoint_service_name",
-        "remote_endpoint_service_name",
-        "operation_name",
-        "kind"
-    ],
-    "type": "TYPE_TREE",
-    "updated_at": "2021-04-15T01:30:15.01Z"
-}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/zipkin.json 
b/pkg/test/trace/testdata/traces/zipkin.json
index 7dd6d5a6..d90ee6af 100644
--- a/pkg/test/trace/testdata/traces/zipkin.json
+++ b/pkg/test/trace/testdata/traces/zipkin.json
@@ -24,10 +24,6 @@
             "name": "kind",
             "type": "TAG_TYPE_STRING"
         },
-        {
-            "name": "timestamp",
-            "type": "TAG_TYPE_TIMESTAMP"
-        },
         {
             "name": "duration",
             "type": "TAG_TYPE_INT"
@@ -63,6 +59,14 @@
         {
             "name": "debug",
             "type": "TAG_TYPE_INT"
+        },
+        {
+            "name": "query",
+            "type": "TAG_TYPE_STRING_ARRAY"
+        },
+        {
+            "name": "timestamp",
+            "type": "TAG_TYPE_TIMESTAMP"
         }
     ],
     "trace_id_tag_name": "trace_id",
diff --git a/test/cases/init.go b/test/cases/init.go
index 7f335558..9c4bb645 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -64,7 +64,7 @@ func Initialize(addr string, now time.Time) {
        // trace
        interval = 500 * time.Millisecond
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now, 
interval)
-       casestrace.Write(conn, "zipkin", now, interval)
+       casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now, 
interval)
        time.Sleep(5 * time.Second)
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", 
"sw_mixed_traces", now.Add(time.Minute), interval)
 }
diff --git a/test/cases/stream/data/input/err_in_arr.yaml 
b/test/cases/stream/data/input/err_in_arr.yaml
new file mode 100644
index 00000000..29b1a098
--- /dev/null
+++ b/test/cases/stream/data/input/err_in_arr.yaml
@@ -0,0 +1,32 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["default"]
+projection:
+  tagFamilies:
+  - name: "searchable"
+    tags: ["trace_id", "extended_tags"]
+  - name: "data"
+    tags: ["data_binary"]
+criteria:
+  condition:
+    name: "extended_tags"
+    op: "BINARY_OP_IN"
+    value:
+      strArray:
+        value: ["c", "b"]
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index b2376586..97ae14a5 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -89,4 +89,5 @@ var _ = g.DescribeTable("Scanning Streams", func(args 
helpers.Args) {
        g.Entry("multi-groups: update tag type", helpers.Args{Input: 
"multi_group_tag_type", Duration: 1 * time.Hour, IgnoreElementID: true}),
        g.Entry("multi-groups: sort duration", helpers.Args{Input: 
"multi_group_sort_duration", Duration: 1 * time.Hour, IgnoreElementID: true}),
        g.Entry("hybrid index", helpers.Args{Input: "hybrid_index", Duration: 1 
* time.Hour, IgnoreElementID: true}),
+       g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
 )
diff --git a/test/cases/trace/data/input/err_in_arr.yml 
b/test/cases/trace/data/input/err_in_arr.yml
new file mode 100644
index 00000000..dc2a33dc
--- /dev/null
+++ b/test/cases/trace/data/input/err_in_arr.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+  index_rule_name: "zipkin-timestamp"
+  sort: "SORT_DESC"
+criteria:
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "query"
+        op: "BINARY_OP_IN"
+        value:
+          strArray:
+            value: ["SELECT * FROM users"]
+    right:
+      le:
+        op: "LOGICAL_OP_AND"
+        left:
+          condition:
+            name: "span_id"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "span_002"
+        right:
+          condition:
+            name: "operation_name"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "/db/query"
diff --git a/test/cases/trace/data/input/having_query_tag.yml 
b/test/cases/trace/data/input/having_query_tag.yml
new file mode 100644
index 00000000..59ffde14
--- /dev/null
+++ b/test/cases/trace/data/input/having_query_tag.yml
@@ -0,0 +1,50 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "zipkin"
+groups: ["zipkinTrace"]
+tag_projection: ["trace_id", "span_id", "operation_name", "query"]
+order_by:
+  index_rule_name: "zipkin-timestamp"
+  sort: "SORT_DESC"
+criteria:
+  le:
+    op: "LOGICAL_OP_AND"
+    left:
+      condition:
+        name: "query"
+        op: "BINARY_OP_HAVING"
+        value:
+          strArray:
+            value: ["SELECT * FROM users"]
+    right:
+      le:
+        op: "LOGICAL_OP_AND"
+        left:
+          condition:
+            name: "span_id"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "span_002"
+        right:
+          condition:
+            name: "operation_name"
+            op: "BINARY_OP_EQ"
+            value:
+              str:
+                value: "/db/query"
diff --git a/test/cases/trace/data/testdata/zipkin.json 
b/test/cases/trace/data/testdata/zipkin.json
index 19a636f3..9b21b412 100644
--- a/test/cases/trace/data/testdata/zipkin.json
+++ b/test/cases/trace/data/testdata/zipkin.json
@@ -70,6 +70,15 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "user_id=123",
+                        "limit=10",
+                        "offset=0"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_001_span_001"
@@ -145,6 +154,14 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "SELECT * FROM users",
+                        "WHERE id=123"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_001_span_002"
@@ -220,6 +237,15 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "order_id=456",
+                        "status=pending",
+                        "customer_id=789"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_002_span_003"
@@ -295,6 +321,14 @@
                 "int": {
                     "value": 0
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "GET order:456",
+                        "ttl=3600"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_002_span_004"
@@ -370,6 +404,13 @@
                 "int": {
                     "value": 1
                 }
+            },
+            {
+                "str_array": {
+                    "value": [
+                        "health_check=true"
+                    ]
+                }
             }
         ],
         "span": "zipkin_trace_003_span_005"
diff --git a/test/cases/trace/data/want/having_query_tag.yml 
b/test/cases/trace/data/want/having_query_tag.yml
new file mode 100644
index 00000000..dd3880cc
--- /dev/null
+++ b/test/cases/trace/data/want/having_query_tag.yml
@@ -0,0 +1,61 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+traces:
+  - spans:
+    - span: zipkin_trace_001_span_001
+      tags:
+      - key: span_id
+        value:
+          str:
+            value: span_001
+      - key: operation_name
+        value:
+          str:
+            value: /api/users
+      - key: query
+        value:
+          strArray:
+            value:
+            - user_id=123
+            - limit=10
+            - offset=0
+      - key: trace_id
+        value:
+          str:
+            value: zipkin_trace_001
+    - span: zipkin_trace_001_span_002
+      tags:
+      - key: span_id
+        value:
+          str:
+            value: span_002
+      - key: operation_name
+        value:
+          str:
+            value: /db/query
+      - key: query
+        value:
+          strArray:
+            value:
+            - SELECT * FROM users
+            - WHERE id=123
+      - key: trace_id
+        value:
+          str:
+            value: zipkin_trace_001
+            
\ No newline at end of file
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index 570b5526..fd767504 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -51,4 +51,6 @@ var _ = g.DescribeTable("Scanning Traces", func(args 
helpers.Args) {
        g.Entry("filter by endpoint", helpers.Args{Input: 
"eq_endpoint_order_duration_asc", Duration: 1 * time.Hour}),
        g.Entry("order by timestamp limit 2", helpers.Args{Input: 
"order_timestamp_desc_limit", Duration: 1 * time.Hour}),
        g.Entry("filter by trace id and service unknown", helpers.Args{Input: 
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
+       g.Entry("filter by query", helpers.Args{Input: "having_query_tag", 
Duration: 1 * time.Hour}),
+       g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
 )


Reply via email to