This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 198129c6 Optimize query performance of series index (#491)
198129c6 is described below

commit 198129c6901cb4cb5bf123aa19651ab701837736
Author: Huang Youliang <butterbright0...@gmail.com>
AuthorDate: Sat Aug 3 15:16:10 2024 +0800

    Optimize query performance of series index (#491)
---
 CHANGES.md                                         |   1 +
 banyand/internal/storage/index.go                  |  23 +-
 banyand/internal/storage/storage.go                |   3 +-
 banyand/measure/query.go                           |   2 +-
 banyand/stream/query.go                            |   4 +-
 .../logical/interface.go => convert/json.go}       |  45 +--
 pkg/index/index.go                                 |  22 +-
 pkg/index/inverted/inverted.go                     |  35 +-
 pkg/index/inverted/query.go                        | 431 +++++++++++++++++++++
 pkg/query/logical/common.go                        |  17 +-
 pkg/query/logical/expr.go                          |  10 +
 pkg/query/logical/expr_literal.go                  |  73 +++-
 pkg/query/logical/interface.go                     |   1 +
 .../measure/measure_plan_indexscan_local.go        |  12 +-
 pkg/query/logical/parser.go                        | 159 ++++++++
 pkg/query/logical/{ => stream}/index_filter.go     | 220 ++---------
 pkg/query/logical/stream/stream_plan_tag_filter.go |   2 +-
 pkg/query/logical/tag_filter.go                    |  19 +-
 pkg/query/model/model.go                           |   3 +-
 19 files changed, 790 insertions(+), 292 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 50884405..ba07da6f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
 - Add the stream query trace.
 - Add the topN query trace.
 - Introduce the round-robin selector to Liaison Node.
+- Optimize query performance of series index.
 
 ### Bugs
 
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index b8cd31cb..d5950c15 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -20,20 +20,17 @@ package storage
 import (
        "context"
        "path"
-       "strings"
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
-       "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 func (s *segment[T, O]) IndexDB() IndexDB {
@@ -200,24 +197,12 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
        }
 
        pl := seriesList.ToList()
-       if opts.Filter != nil && opts.Filter != logical.ENode {
+       if opts.Query != nil {
                var plFilter posting.List
                func() {
                        if tracer != nil {
                                span, _ := tracer.StartSpan(ctx, "filter")
-                               span.Tag("exp", opts.Filter.String())
-                               var projectionStrBuilder strings.Builder
-                               if len(opts.Projection) > 0 {
-                                       projectionStrBuilder.WriteString("[")
-                                       for i, p := range opts.Projection {
-                                               if i > 0 {
-                                                       
projectionStrBuilder.WriteString(", ")
-                                               }
-                                               
projectionStrBuilder.WriteRune(rune(p.IndexRuleID))
-                                       }
-                                       projectionStrBuilder.WriteString("]")
-                                       span.Tagf("projection", "%s", 
projectionStrBuilder.String())
-                               }
+                               span.Tag("exp", opts.Query.String())
                                defer func() {
                                        if err != nil {
                                                span.Error(err)
@@ -228,9 +213,7 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
                                        span.Stop()
                                }()
                        }
-                       if plFilter, err = opts.Filter.Execute(func(_ 
databasev1.IndexRule_Type) (index.Searcher, error) {
-                               return s.store, nil
-                       }, 0); err != nil {
+                       if plFilter, err = s.store.Execute(ctx, opts.Query); 
err != nil {
                                return
                        }
                        if plFilter == nil {
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 1ccf03d7..fb7e0af6 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -34,6 +34,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -66,7 +67,7 @@ type SupplyTSDB[T TSTable] func() T
 
 // IndexSearchOpts is the options for searching index.
 type IndexSearchOpts struct {
-       Filter      index.Filter
+       Query       *inverted.Query
        Order       *model.OrderBy
        Projection  []index.FieldKey
        PreloadSize int
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index ae37521a..fa94091e 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -194,7 +194,7 @@ func (s *measure) searchSeriesList(ctx context.Context, 
series []*pbv1.Series, m
        seriesFilter := roaring.NewPostingList()
        for i := range segments {
                sll, fieldResultList, err := segments[i].IndexDB().Search(ctx, 
series, storage.IndexSearchOpts{
-                       Filter:      mqo.Filter,
+                       Query:       mqo.Query,
                        Order:       mqo.Order,
                        PreloadSize: preloadSize,
                        Projection:  indexProjection,
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 9dff205f..0a5ebdc3 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -37,7 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
-       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+       logicalstream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
@@ -523,7 +523,7 @@ func (qr *queryResult) mergeByTimestamp() 
*model.StreamResult {
 func indexSearch(sqo model.StreamQueryOptions,
        tabs []*tsTable, seriesList pbv1.SeriesList,
 ) (posting.List, error) {
-       if sqo.Filter == nil || sqo.Filter == logical.ENode {
+       if sqo.Filter == nil || sqo.Filter == logicalstream.ENode {
                return nil, nil
        }
        result := roaring.NewPostingList()
diff --git a/pkg/query/logical/interface.go b/pkg/convert/json.go
similarity index 50%
copy from pkg/query/logical/interface.go
copy to pkg/convert/json.go
index dc002cad..22b250d9 100644
--- a/pkg/query/logical/interface.go
+++ b/pkg/convert/json.go
@@ -15,42 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logical
+package convert
 
-import (
-       "fmt"
-)
+import "encoding/json"
 
-// UnresolvedPlan denotes an logical expression.
-// It could be analyzed to a Plan(executable operation) with the Schema.
-type UnresolvedPlan interface {
-       Analyze(Schema) (Plan, error)
-}
-
-// Plan is the executable operation. It belongs to a execution tree.
-type Plan interface {
-       fmt.Stringer
-       Children() []Plan
-       Schema() Schema
-}
-
-// Expr represents a predicate in criteria.
-type Expr interface {
-       fmt.Stringer
-       DataType() int32
-       Equal(Expr) bool
-}
-
-// LiteralExpr allows getting raw data represented as bytes.
-type LiteralExpr interface {
-       Expr
-       Bytes() [][]byte
-}
-
-// ComparableExpr allows comparing Expr and Expr arrays.
-type ComparableExpr interface {
-       LiteralExpr
-       Compare(LiteralExpr) (int, bool)
-       BelongTo(LiteralExpr) bool
-       Contains(LiteralExpr) bool
+// JSONToString converts a JSON marshaler to its JSON string representation.
+func JSONToString(marshaler json.Marshaler) string {
+       bb, err := marshaler.MarshalJSON()
+       if err != nil {
+               return err.Error()
+       }
+       return string(bb)
 }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 5d5bf91e..8234f496 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -24,6 +24,8 @@ import (
        "fmt"
        "io"
 
+       "github.com/blugelabs/bluge"
+
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -93,7 +95,7 @@ func (r RangeOpts) Between(value []byte) int {
        return 0
 }
 
-// DocumentResult represents a document in a index.
+// DocumentResult represents a document in an index.
 type DocumentResult struct {
        Values      map[string][]byte
        SortedValue []byte
@@ -131,7 +133,7 @@ func (i *dummyIterator) Close() error {
        return nil
 }
 
-// Document represents a document in a index.
+// Document represents a document in an index.
 type Document struct {
        Fields       []Field
        EntityValues []byte
@@ -147,7 +149,7 @@ type Batch struct {
        Documents Documents
 }
 
-// Writer allows writing fields and docID in a document to a index.
+// Writer allows writing fields and docID in a document to an index.
 type Writer interface {
        Batch(batch Batch) error
 }
@@ -167,7 +169,14 @@ type Searcher interface {
        Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
 }
 
-// Store is an abstract of a index repository.
+// Query is an abstract of an index query.
+type Query interface {
+       bluge.Query
+       fmt.Stringer
+       Query() bluge.Query
+}
+
+// Store is an abstract of an index repository.
 type Store interface {
        io.Closer
        Writer
@@ -175,7 +184,7 @@ type Store interface {
        SizeOnDisk() int64
 }
 
-// Series represents a series in a index.
+// Series represents a series in an index.
 type Series struct {
        EntityValues []byte
        ID           common.SeriesID
@@ -185,7 +194,7 @@ func (s Series) String() string {
        return fmt.Sprintf("%s:%d", s.EntityValues, s.ID)
 }
 
-// SeriesDocument represents a series document in a index.
+// SeriesDocument represents a series document in an index.
 type SeriesDocument struct {
        Fields map[string][]byte
        Key    Series
@@ -196,6 +205,7 @@ type SeriesStore interface {
        Store
        // Search returns a list of series that match the given matchers.
        Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, 
error)
+       Execute(context.Context, Query) (posting.List, error)
 }
 
 // SeriesMatcherType represents the type of series matcher.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4d831fb2..a481b7f3 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package inverted implements a inverted index repository.
+// Package inverted implements an inverted index repository.
 package inverted
 
 import (
@@ -62,10 +62,11 @@ var (
        defaultProjection       = []string{docIDField}
 )
 
-var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
+// Analyzers is a map that associates each IndexRule_Analyzer type with a 
corresponding Analyzer.
+var Analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
 
 func init() {
-       analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
+       Analyzers = map[databasev1.IndexRule_Analyzer]*analysis.Analyzer{
                databasev1.IndexRule_ANALYZER_KEYWORD:  
analyzer.NewKeywordAnalyzer(),
                databasev1.IndexRule_ANALYZER_SIMPLE:   
analyzer.NewSimpleAnalyzer(),
                databasev1.IndexRule_ANALYZER_STANDARD: 
analyzer.NewStandardAnalyzer(),
@@ -74,7 +75,7 @@ func init() {
 
 var _ index.Store = (*store)(nil)
 
-// StoreOpts wraps options to create a inverted index repository.
+// StoreOpts wraps options to create an inverted index repository.
 type StoreOpts struct {
        Logger       *logger.Logger
        Path         string
@@ -124,7 +125,7 @@ func (s *store) Batch(batch index.Batch) error {
                                tf.StoreValue()
                        }
                        if f.Key.Analyzer != 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-                               tf = tf.WithAnalyzer(analyzers[f.Key.Analyzer])
+                               tf = tf.WithAnalyzer(Analyzers[f.Key.Analyzer])
                        }
                        doc.AddField(tf)
                }
@@ -153,7 +154,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
                        WithPersisterNapTimeMSec(int(opts.BatchWaitSec * 1000))
        }
        config := bluge.DefaultConfigWithIndexConfig(indexConfig)
-       config.DefaultSearchAnalyzer = 
analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
+       config.DefaultSearchAnalyzer = 
Analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
        config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
        w, err := bluge.OpenWriter(config)
        if err != nil {
@@ -271,7 +272,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        if err != nil {
                return nil, err
        }
-       analyzer := analyzers[fieldKey.Analyzer]
+       analyzer := Analyzers[fieldKey.Analyzer]
        fk := fieldKey.Marshal()
        query := bluge.NewBooleanQuery()
        if fieldKey.HasSeriesID() {
@@ -309,6 +310,26 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        return
 }
 
+func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, 
error) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return nil, err
+       }
+       documentMatchIterator, err := reader.Search(ctx, 
bluge.NewAllMatches(query.Query()))
+       if err != nil {
+               return nil, err
+       }
+       iter := newBlugeMatchIterator(documentMatchIterator, reader, nil)
+       defer func() {
+               err = multierr.Append(err, iter.Close())
+       }()
+       list := roaring.NewPostingList()
+       for iter.Next() {
+               list.Insert(iter.Val().DocID)
+       }
+       return list, err
+}
+
 func (s *store) SizeOnDisk() int64 {
        _, bytes := s.writer.DirectoryStats()
        return int64(bytes)
diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go
new file mode 100644
index 00000000..e3f90fda
--- /dev/null
+++ b/pkg/index/inverted/query.go
@@ -0,0 +1,431 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "strings"
+
+       "github.com/blugelabs/bluge"
+       "github.com/blugelabs/bluge/search"
+       "github.com/pkg/errors"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+)
+
+var (
+       minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0])
+       maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0])
+       minInf  = "-inf"
+       maxInf  = "+inf"
+)
+
+// GlobalIndexError represents a index rule is "global".
+// The local filter can't handle it.
+type GlobalIndexError struct {
+       IndexRule *databasev1.IndexRule
+       Expr      logical.LiteralExpr
+}
+
+func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
+
+// Query is a wrapper for bluge.Query.
+type Query struct {
+       query bluge.Query
+       node
+}
+
+// Searcher implements index.Query.
+func (q *Query) Searcher(i search.Reader, options search.SearcherOptions) 
(search.Searcher, error) {
+       return q.query.Searcher(i, options)
+}
+
+func (q *Query) String() string {
+       return q.node.String()
+}
+
+// Query implements index.Query.
+func (q *Query) Query() bluge.Query {
+       return q.query
+}
+
+// BuildLocalQuery returns blugeQuery for local indices.
+func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, 
entityDict map[string]int,
+       entity []*modelv1.TagValue,
+) (*Query, [][]*modelv1.TagValue, bool, error) {
+       if criteria == nil {
+               return nil, [][]*modelv1.TagValue{entity}, false, nil
+       }
+       switch criteria.GetExp().(type) {
+       case *modelv1.Criteria_Condition:
+               cond := criteria.GetCondition()
+               expr, parsedEntity, err := 
logical.ParseExprOrEntity(entityDict, entity, cond)
+               if err != nil {
+                       return nil, nil, false, err
+               }
+               if parsedEntity != nil {
+                       return nil, parsedEntity, false, nil
+               }
+               if ok, indexRule := schema.IndexDefined(cond.Name); ok {
+                       return parseConditionToQuery(cond, indexRule, expr, 
entity)
+               }
+               return nil, nil, false, 
errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s", 
cond)
+       case *modelv1.Criteria_Le:
+               le := criteria.GetLe()
+               if le.GetLeft() == nil && le.GetRight() == nil {
+                       return nil, nil, false, 
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and 
right) of [%v] are empty", criteria)
+               }
+               if le.GetLeft() == nil {
+                       return BuildLocalQuery(le.Right, schema, entityDict, 
entity)
+               }
+               if le.GetRight() == nil {
+                       return BuildLocalQuery(le.Left, schema, entityDict, 
entity)
+               }
+               left, leftEntities, leftIsMatchAllQuery, err := 
BuildLocalQuery(le.Left, schema, entityDict, entity)
+               if err != nil {
+                       return nil, nil, false, err
+               }
+               right, rightEntities, rightIsMatchAllQuery, err := 
BuildLocalQuery(le.Right, schema, entityDict, entity)
+               if err != nil {
+                       return nil, nil, false, err
+               }
+               entities := logical.ParseEntities(le.Op, entity, leftEntities, 
rightEntities)
+               if entities == nil {
+                       return nil, nil, false, nil
+               }
+               if left == nil && right == nil {
+                       return nil, entities, false, nil
+               }
+               if leftIsMatchAllQuery && rightIsMatchAllQuery {
+                       return &Query{
+                               query: bluge.NewMatchAllQuery(),
+                               node:  newMatchAllNode(),
+                       }, entities, true, nil
+               }
+               switch le.Op {
+               case modelv1.LogicalExpression_LOGICAL_OP_AND:
+                       query, node := bluge.NewBooleanQuery(), newMustNode()
+                       if left != nil {
+                               query.AddMust(left.query)
+                               node.Append(left.node)
+                       }
+                       if right != nil {
+                               query.AddMust(right.query)
+                               node.Append(right.node)
+                       }
+                       return &Query{query, node}, entities, false, nil
+               case modelv1.LogicalExpression_LOGICAL_OP_OR:
+                       if leftIsMatchAllQuery || rightIsMatchAllQuery {
+                               return &Query{
+                                       query: bluge.NewMatchAllQuery(),
+                                       node:  newMatchAllNode(),
+                               }, entities, true, nil
+                       }
+                       query, node := bluge.NewBooleanQuery(), newShouldNode()
+                       query.SetMinShould(1)
+                       if left != nil {
+                               query.AddShould(left.query)
+                               node.Append(left.node)
+                       }
+                       if right != nil {
+                               query.AddShould(right.query)
+                               node.Append(right.node)
+                       }
+                       return &Query{query, node}, entities, false, nil
+               }
+       }
+       return nil, nil, false, logical.ErrInvalidCriteriaType
+}
+
+func parseConditionToQuery(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule,
+       expr logical.LiteralExpr, entity []*modelv1.TagValue,
+) (*Query, [][]*modelv1.TagValue, bool, error) {
+       field := string(convert.Uint32ToBytes(indexRule.Metadata.Id))
+       b := expr.Bytes()
+       if len(b) < 1 {
+               return &Query{
+                       query: bluge.NewMatchAllQuery(),
+                       node:  newMatchAllNode(),
+               }, [][]*modelv1.TagValue{entity}, true, nil
+       }
+       term, str := string(b[0]), expr.String()
+       switch cond.Op {
+       case modelv1.Condition_BINARY_OP_GT:
+               query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false, 
false).SetField(field)
+               node := newTermRangeInclusiveNode(str, maxInf, false, false, 
indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_GE:
+               query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true, 
false).SetField(field)
+               node := newTermRangeInclusiveNode(str, maxInf, true, false, 
indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_LT:
+               query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, 
false).SetField(field)
+               node := newTermRangeInclusiveNode(minInf, str, false, false, 
indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_LE:
+               query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, 
true).SetField(field)
+               node := newTermRangeInclusiveNode(minInf, str, false, true, 
indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_EQ:
+               query := bluge.NewTermQuery(term).SetField(field)
+               node := newTermNode(str, indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_MATCH:
+               query := 
bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer])
+               node := newMatchNode(str, indexRule)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_NE:
+               query, node := bluge.NewBooleanQuery(), newMustNotNode()
+               query.AddMustNot(bluge.NewTermQuery(term).SetField(field))
+               node.SetSubNode(newTermNode(str, indexRule))
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_HAVING:
+               bb, elements := expr.Bytes(), expr.Elements()
+               query, node := bluge.NewBooleanQuery(), newMustNode()
+               for _, b := range bb {
+                       
query.AddMust(bluge.NewTermQuery(string(b)).SetField(field))
+               }
+               for _, e := range elements {
+                       node.Append(newTermNode(e, indexRule))
+               }
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_NOT_HAVING:
+               bb, elements := expr.Bytes(), expr.Elements()
+               subQuery, subNode := bluge.NewBooleanQuery(), newMustNode()
+               for _, b := range bb {
+                       
subQuery.AddMust(bluge.NewTermQuery(string(b)).SetField(field))
+               }
+               for _, e := range elements {
+                       subNode.Append(newTermNode(e, indexRule))
+               }
+               query, node := bluge.NewBooleanQuery(), newMustNotNode()
+               query.AddMustNot(subQuery)
+               node.SetSubNode(node)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_IN:
+               bb, elements := expr.Bytes(), expr.Elements()
+               query, node := bluge.NewBooleanQuery(), newShouldNode()
+               query.SetMinShould(1)
+               for _, b := range bb {
+                       
query.AddShould(bluge.NewTermQuery(string(b)).SetField(field))
+               }
+               for _, e := range elements {
+                       node.Append(newTermNode(e, indexRule))
+               }
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       case modelv1.Condition_BINARY_OP_NOT_IN:
+               bb, elements := expr.Bytes(), expr.Elements()
+               subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode()
+               subQuery.SetMinShould(1)
+               for _, b := range bb {
+                       
subQuery.AddShould(bluge.NewTermQuery(string(b)).SetField(field))
+               }
+               for _, e := range elements {
+                       subNode.Append(newTermNode(e, indexRule))
+               }
+               query, node := bluge.NewBooleanQuery(), newMustNotNode()
+               query.AddMustNot(subQuery)
+               node.SetSubNode(subNode)
+               return &Query{query, node}, [][]*modelv1.TagValue{entity}, 
false, nil
+       }
+       return nil, nil, false, 
errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses 
%v", cond)
+}
+
+type node interface {
+       fmt.Stringer
+}
+
+type mustNode struct {
+       subNodes []node
+}
+
+func newMustNode() *mustNode {
+       return &mustNode{
+               subNodes: make([]node, 0),
+       }
+}
+
+func (m *mustNode) Append(subNode node) {
+       m.subNodes = append(m.subNodes, subNode)
+}
+
+func (m *mustNode) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["must"] = m.subNodes
+       return json.Marshal(data)
+}
+
+func (m *mustNode) String() string {
+       return convert.JSONToString(m)
+}
+
+type shouldNode struct {
+       subNodes []node
+}
+
+func newShouldNode() *shouldNode {
+       return &shouldNode{
+               subNodes: make([]node, 0),
+       }
+}
+
+func (s *shouldNode) Append(subNode node) {
+       s.subNodes = append(s.subNodes, subNode)
+}
+
+func (s *shouldNode) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["should"] = s.subNodes
+       return json.Marshal(data)
+}
+
+func (s *shouldNode) String() string {
+       return convert.JSONToString(s)
+}
+
+type mustNotNode struct {
+       subNode node
+}
+
+func newMustNotNode() *mustNotNode {
+       return &mustNotNode{}
+}
+
+func (m *mustNotNode) SetSubNode(subNode node) {
+       m.subNode = subNode
+}
+
+func (m *mustNotNode) MarshalJSON() ([]byte, error) {
+       data := make(map[string]interface{}, 1)
+       data["mustNot"] = m.subNode
+       return json.Marshal(data)
+}
+
+func (m *mustNotNode) String() string {
+       return convert.JSONToString(m)
+}
+
+type matchAllNode struct{}
+
+func newMatchAllNode() *matchAllNode {
+       return &matchAllNode{}
+}
+
+func (m *matchAllNode) String() string {
+       return "matchAll"
+}
+
+type termRangeInclusiveNode struct {
+       indexRule    *databasev1.IndexRule
+       min          string
+       max          string
+       minInclusive bool
+       maxInclusive bool
+}
+
+func newTermRangeInclusiveNode(min, max string, minInclusive, maxInclusive 
bool, indexRule *databasev1.IndexRule) *termRangeInclusiveNode {
+       return &termRangeInclusiveNode{
+               indexRule:    indexRule,
+               min:          min,
+               max:          max,
+               minInclusive: minInclusive,
+               maxInclusive: maxInclusive,
+       }
+}
+
+func (t *termRangeInclusiveNode) MarshalJSON() ([]byte, error) {
+       inner := make(map[string]interface{}, 1)
+       var builder strings.Builder
+       if t.minInclusive {
+               builder.WriteString("[")
+       } else {
+               builder.WriteString("(")
+       }
+       builder.WriteString(t.min + " ")
+       builder.WriteString(t.max)
+       if t.maxInclusive {
+               builder.WriteString("]")
+       } else {
+               builder.WriteString(")")
+       }
+       inner["range"] = builder.String()
+       inner["index"] = t.indexRule.Metadata.Name + ":" + 
t.indexRule.Metadata.Group
+       data := make(map[string]interface{}, 1)
+       data["termRangeInclusive"] = inner
+       return json.Marshal(data)
+}
+
+func (t *termRangeInclusiveNode) String() string {
+       return convert.JSONToString(t)
+}
+
+type termNode struct {
+       indexRule *databasev1.IndexRule
+       term      string
+}
+
+func newTermNode(term string, indexRule *databasev1.IndexRule) *termNode {
+       return &termNode{
+               indexRule: indexRule,
+               term:      term,
+       }
+}
+
+func (t *termNode) MarshalJSON() ([]byte, error) {
+       inner := make(map[string]interface{}, 1)
+       inner["index"] = t.indexRule.Metadata.Name + ":" + 
t.indexRule.Metadata.Group
+       inner["value"] = t.term
+       data := make(map[string]interface{}, 1)
+       data["term"] = inner
+       return json.Marshal(data)
+}
+
+func (t *termNode) String() string {
+       return convert.JSONToString(t)
+}
+
+type matchNode struct {
+       indexRule *databasev1.IndexRule
+       match     string
+}
+
+func newMatchNode(match string, indexRule *databasev1.IndexRule) *matchNode {
+       return &matchNode{
+               indexRule: indexRule,
+               match:     match,
+       }
+}
+
+func (m *matchNode) MarshalJSON() ([]byte, error) {
+       inner := make(map[string]interface{}, 1)
+       inner["index"] = m.indexRule.Metadata.Name + ":" + 
m.indexRule.Metadata.Group
+       inner["value"] = m.match
+       inner["analyzer"] = 
databasev1.IndexRule_Analyzer_name[int32(m.indexRule.Analyzer)]
+       data := make(map[string]interface{}, 1)
+       data["match"] = inner
+       return json.Marshal(data)
+}
+
+func (m *matchNode) String() string {
+       return convert.JSONToString(m)
+}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 88b77220..d06129b9 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -24,12 +24,17 @@ import (
 )
 
 var (
-       errTagNotDefined             = errors.New("tag is not defined")
-       errUnsupportedConditionOp    = errors.New("unsupported condition 
operation")
-       errUnsupportedConditionValue = errors.New("unsupported condition value 
type")
-       errInvalidCriteriaType       = errors.New("invalid criteria type")
-       errIndexNotDefined           = errors.New("index is not define for the 
tag")
-       errIndexSortingUnsupported   = errors.New("index does not support 
sorting")
+       // ErrUnsupportedConditionOp indicates an unsupported condition 
operation.
+       ErrUnsupportedConditionOp = errors.New("unsupported condition 
operation")
+       // ErrUnsupportedConditionValue indicates an unsupported condition 
value type.
+       ErrUnsupportedConditionValue = errors.New("unsupported condition value 
type")
+       // ErrInvalidCriteriaType indicates an invalid criteria type.
+       ErrInvalidCriteriaType = errors.New("invalid criteria type")
+       // ErrInvalidLogicalExpression indicates an invalid logical expression.
+       ErrInvalidLogicalExpression = errors.New("invalid logical expression")
+       errTagNotDefined            = errors.New("tag is not defined")
+       errIndexNotDefined          = errors.New("index is not define for the 
tag")
+       errIndexSortingUnsupported  = errors.New("index does not support 
sorting")
 )
 
 // Tag represents the combination of  tag family and tag name.
diff --git a/pkg/query/logical/expr.go b/pkg/query/logical/expr.go
index ed0bdeda..fef36557 100644
--- a/pkg/query/logical/expr.go
+++ b/pkg/query/logical/expr.go
@@ -55,6 +55,11 @@ func (f *TagRef) String() string {
        return fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(), 
f.Spec.Spec.GetType().String())
 }
 
+// Elements returns a slice containing the string representation of TagRef.
+func (f *TagRef) Elements() []string {
+       return []string{fmt.Sprintf("#%s<%s>", f.Tag.GetCompoundName(), 
f.Spec.Spec.GetType().String())}
+}
+
 // NewTagRef returns a new TagRef.
 func NewTagRef(familyName, tagName string) *TagRef {
        return &TagRef{
@@ -85,6 +90,11 @@ func (f *FieldRef) String() string {
        return fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(), 
f.Spec.Spec.GetFieldType().String())
 }
 
+// Elements returns a slice containing the string representation of FieldRef.
+func (f *FieldRef) Elements() []string {
+       return []string{fmt.Sprintf("#%s<%s>", f.Spec.Spec.GetName(), 
f.Spec.Spec.GetFieldType().String())}
+}
+
 // DataType shows the type of the filed's value.
 func (f *FieldRef) DataType() int32 {
        if f.Spec == nil {
diff --git a/pkg/query/logical/expr_literal.go 
b/pkg/query/logical/expr_literal.go
index 0a6b5fd9..c21fdd08 100644
--- a/pkg/query/logical/expr_literal.go
+++ b/pkg/query/logical/expr_literal.go
@@ -39,6 +39,12 @@ type int64Literal struct {
        int64
 }
 
+func newInt64Literal(val int64) *int64Literal {
+       return &int64Literal{
+               int64: val,
+       }
+}
+
 func (i *int64Literal) Compare(other LiteralExpr) (int, bool) {
        if o, ok := other.(*int64Literal); ok {
                return int(i.int64 - o.int64), true
@@ -88,6 +94,10 @@ func (i *int64Literal) String() string {
        return strconv.FormatInt(i.int64, 10)
 }
 
+func (i *int64Literal) Elements() []string {
+       return []string{strconv.FormatInt(i.int64, 10)}
+}
+
 var (
        _ LiteralExpr    = (*int64ArrLiteral)(nil)
        _ ComparableExpr = (*int64ArrLiteral)(nil)
@@ -97,6 +107,12 @@ type int64ArrLiteral struct {
        arr []int64
 }
 
+func newInt64ArrLiteral(val []int64) *int64ArrLiteral {
+       return &int64ArrLiteral{
+               arr: val,
+       }
+}
+
 func (i *int64ArrLiteral) Compare(other LiteralExpr) (int, bool) {
        if o, ok := other.(*int64ArrLiteral); ok {
                return 0, slices.Equal(i.arr, o.arr)
@@ -161,6 +177,14 @@ func (i *int64ArrLiteral) String() string {
        return fmt.Sprintf("%v", i.arr)
 }
 
+func (i *int64ArrLiteral) Elements() []string {
+       var elements []string
+       for _, v := range i.arr {
+               elements = append(elements, strconv.FormatInt(v, 10))
+       }
+       return elements
+}
+
 var (
        _ LiteralExpr    = (*strLiteral)(nil)
        _ ComparableExpr = (*strLiteral)(nil)
@@ -223,6 +247,10 @@ func (s *strLiteral) String() string {
        return s.string
 }
 
+func (s *strLiteral) Elements() []string {
+       return []string{s.string}
+}
+
 var (
        _ LiteralExpr    = (*strArrLiteral)(nil)
        _ ComparableExpr = (*strArrLiteral)(nil)
@@ -232,6 +260,12 @@ type strArrLiteral struct {
        arr []string
 }
 
+func newStrArrLiteral(val []string) *strArrLiteral {
+       return &strArrLiteral{
+               arr: val,
+       }
+}
+
 func (s *strArrLiteral) Compare(other LiteralExpr) (int, bool) {
        if o, ok := other.(*strArrLiteral); ok {
                return 0, StringSlicesEqual(s.arr, o.arr)
@@ -296,34 +330,49 @@ func (s *strArrLiteral) String() string {
        return fmt.Sprintf("%v", s.arr)
 }
 
-type bytesLiteral struct {
+func (s *strArrLiteral) Elements() []string {
+       return s.arr
+}
+
+// BytesLiteral represents a wrapper for a slice of bytes.
+type BytesLiteral struct {
        bb []byte
 }
 
-func newBytesLiteral(bb []byte) *bytesLiteral {
-       return &bytesLiteral{bb: bb}
+// NewBytesLiteral creates a new instance of BytesLiteral with the provided 
slice of bytes.
+func NewBytesLiteral(bb []byte) *BytesLiteral {
+       return &BytesLiteral{bb: bb}
 }
 
-func (b *bytesLiteral) Bytes() [][]byte {
+// Bytes returns a 2D slice of bytes where the inner slice contains the byte 
slice stored in the BytesLiteral.
+func (b *BytesLiteral) Bytes() [][]byte {
        return [][]byte{b.bb}
 }
 
-func (b *bytesLiteral) Equal(expr Expr) bool {
-       if other, ok := expr.(*bytesLiteral); ok {
+// Equal checks if the current BytesLiteral is equal to the provided Expr.
+func (b *BytesLiteral) Equal(expr Expr) bool {
+       if other, ok := expr.(*BytesLiteral); ok {
                return bytes.Equal(other.bb, b.bb)
        }
 
        return false
 }
 
-func (b *bytesLiteral) DataType() int32 {
+// DataType returns the data type of BytesLiteral.
+func (b *BytesLiteral) DataType() int32 {
        return int32(databasev1.TagType_TAG_TYPE_DATA_BINARY)
 }
 
-func (b *bytesLiteral) String() string {
+// String converts the BytesLiteral's slice of bytes to a string 
representation.
+func (b *BytesLiteral) String() string {
        return hex.EncodeToString(b.bb)
 }
 
+// Elements returns a slice containing the string representation of the byte 
slice.
+func (b *BytesLiteral) Elements() []string {
+       return []string{hex.EncodeToString(b.bb)}
+}
+
 var (
        _               LiteralExpr    = (*nullLiteral)(nil)
        _               ComparableExpr = (*nullLiteral)(nil)
@@ -332,6 +381,10 @@ var (
 
 type nullLiteral struct{}
 
+func newNullLiteral() *nullLiteral {
+       return nullLiteralExpr
+}
+
 func (s nullLiteral) Compare(_ LiteralExpr) (int, bool) {
        return 0, false
 }
@@ -359,3 +412,7 @@ func (s nullLiteral) DataType() int32 {
 func (s nullLiteral) String() string {
        return "null"
 }
+
+func (s nullLiteral) Elements() []string {
+       return []string{"null"}
+}
diff --git a/pkg/query/logical/interface.go b/pkg/query/logical/interface.go
index dc002cad..0634473a 100644
--- a/pkg/query/logical/interface.go
+++ b/pkg/query/logical/interface.go
@@ -37,6 +37,7 @@ type Plan interface {
 // Expr represents a predicate in criteria.
 type Expr interface {
        fmt.Stringer
+       Elements() []string
        DataType() int32
        Equal(Expr) bool
 }
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 619d4971..a833023f 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -27,7 +27,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
@@ -87,7 +87,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                // fill AnyEntry by default
                entity[idx] = pbv1.AnyTagValue
        }
-       filter, entities, err := logical.BuildLocalFilter(uis.criteria, s, 
entityMap, entity, true)
+       query, entities, _, err := inverted.BuildLocalQuery(uis.criteria, s, 
entityMap, entity)
        if err != nil {
                return nil, err
        }
@@ -100,7 +100,7 @@ func (uis *unresolvedIndexScan) Analyze(s logical.Schema) 
(logical.Plan, error)
                projectionTagsRefs:   projTagsRefs,
                projectionFieldsRefs: projFieldRefs,
                metadata:             uis.metadata,
-               filter:               filter,
+               query:                query,
                entities:             entities,
                groupByEntity:        uis.groupByEntity,
                uis:                  uis,
@@ -114,7 +114,7 @@ var (
 )
 
 type localIndexScan struct {
-       filter               index.Filter
+       query                *inverted.Query
        schema               logical.Schema
        uis                  *unresolvedIndexScan
        order                *logical.OrderBy
@@ -155,7 +155,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit 
executor.MIterator, e
                Name:            i.metadata.GetName(),
                TimeRange:       &i.timeRange,
                Entities:        i.entities,
-               Filter:          i.filter,
+               Query:           i.query,
                OrderByType:     orderByType,
                Order:           orderBy,
                TagProjection:   i.projectionTags,
@@ -172,7 +172,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit 
executor.MIterator, e
 func (i *localIndexScan) String() string {
        return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; 
projection=%s; order=%s;",
                i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
-               i.filter, logical.FormatTagRefs(", ", i.projectionTagsRefs...), 
i.order)
+               i.query, logical.FormatTagRefs(", ", i.projectionTagsRefs...), 
i.order)
 }
 
 func (i *localIndexScan) Children() []logical.Plan {
diff --git a/pkg/query/logical/parser.go b/pkg/query/logical/parser.go
new file mode 100644
index 00000000..68f8ad47
--- /dev/null
+++ b/pkg/query/logical/parser.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logical
+
+import (
+       "github.com/pkg/errors"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// ParseExprOrEntity parses the condition and returns the literal expression 
or the entities.
+func ParseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, 
cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
+       entityIdx, ok := entityDict[cond.Name]
+       if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != 
modelv1.Condition_BINARY_OP_IN {
+               return nil, nil, errors.WithMessagef(ErrUnsupportedConditionOp, 
"tag belongs to the entity only supports EQ or IN operation in condition(%v)", 
cond)
+       }
+       switch v := cond.Value.Value.(type) {
+       case *modelv1.TagValue_Str:
+               if ok {
+                       parsedEntity := make([]*modelv1.TagValue, len(entity))
+                       copy(parsedEntity, entity)
+                       parsedEntity[entityIdx] = cond.Value
+                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+               }
+               return str(v.Str.GetValue()), nil, nil
+       case *modelv1.TagValue_StrArray:
+               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+                       entities := make([][]*modelv1.TagValue, 
len(v.StrArray.Value))
+                       for i, va := range v.StrArray.Value {
+                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
+                               copy(parsedEntity, entity)
+                               parsedEntity[entityIdx] = &modelv1.TagValue{
+                                       Value: &modelv1.TagValue_Str{
+                                               Str: &modelv1.Str{
+                                                       Value: va,
+                                               },
+                                       },
+                               }
+                               entities[i] = parsedEntity
+                       }
+                       return nil, entities, nil
+               }
+               return newStrArrLiteral(v.StrArray.GetValue()), nil, nil
+       case *modelv1.TagValue_Int:
+               if ok {
+                       parsedEntity := make([]*modelv1.TagValue, len(entity))
+                       copy(parsedEntity, entity)
+                       parsedEntity[entityIdx] = cond.Value
+                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
+               }
+               return newInt64Literal(v.Int.GetValue()), nil, nil
+       case *modelv1.TagValue_IntArray:
+               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
+                       entities := make([][]*modelv1.TagValue, 
len(v.IntArray.Value))
+                       for i, va := range v.IntArray.Value {
+                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
+                               copy(parsedEntity, entity)
+                               parsedEntity[entityIdx] = &modelv1.TagValue{
+                                       Value: &modelv1.TagValue_Int{
+                                               Int: &modelv1.Int{
+                                                       Value: va,
+                                               },
+                                       },
+                               }
+                               entities[i] = parsedEntity
+                       }
+                       return nil, entities, nil
+               }
+               return newInt64ArrLiteral(v.IntArray.GetValue()), nil, nil
+       case *modelv1.TagValue_Null:
+               return newNullLiteral(), nil, nil
+       }
+       return nil, nil, errors.WithMessagef(ErrUnsupportedConditionValue, 
"index filter parses %v", cond)
+}
+
+// ParseEntities merges entities based on the logical operation.
+func ParseEntities(op modelv1.LogicalExpression_LogicalOp, input 
[]*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue {
+       count := len(input)
+       result := make([]*modelv1.TagValue, count)
+       anyEntity := func(entities [][]*modelv1.TagValue) bool {
+               for _, entity := range entities {
+                       for _, entry := range entity {
+                               if entry != pbv1.AnyTagValue {
+                                       return false
+                               }
+                       }
+               }
+               return true
+       }
+       leftAny := anyEntity(left)
+       rightAny := anyEntity(right)
+
+       mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right))
+
+       switch op {
+       case modelv1.LogicalExpression_LOGICAL_OP_AND:
+               if leftAny && !rightAny {
+                       return right
+               }
+               if !leftAny && rightAny {
+                       return left
+               }
+               mergedEntities = append(mergedEntities, left...)
+               mergedEntities = append(mergedEntities, right...)
+               for i := 0; i < count; i++ {
+                       entry := pbv1.AnyTagValue
+                       for j := 0; j < len(mergedEntities); j++ {
+                               e := mergedEntities[j][i]
+                               if e == pbv1.AnyTagValue {
+                                       continue
+                               }
+                               if entry == pbv1.AnyTagValue {
+                                       entry = e
+                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
+                                       return nil
+                               }
+                       }
+                       result[i] = entry
+               }
+       case modelv1.LogicalExpression_LOGICAL_OP_OR:
+               if leftAny {
+                       return left
+               }
+               if rightAny {
+                       return right
+               }
+               mergedEntities = append(mergedEntities, left...)
+               mergedEntities = append(mergedEntities, right...)
+               for i := 0; i < count; i++ {
+                       entry := pbv1.AnyTagValue
+                       for j := 0; j < len(mergedEntities); j++ {
+                               e := mergedEntities[j][i]
+                               if entry == pbv1.AnyTagValue {
+                                       entry = e
+                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
+                                       return mergedEntities
+                               }
+                       }
+                       result[i] = entry
+               }
+       }
+       return [][]*modelv1.TagValue{result}
+}
diff --git a/pkg/query/logical/index_filter.go 
b/pkg/query/logical/stream/index_filter.go
similarity index 66%
rename from pkg/query/logical/index_filter.go
rename to pkg/query/logical/stream/index_filter.go
index 8450c94e..533000f1 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/stream/index_filter.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logical
+package stream
 
 import (
        "bytes"
@@ -28,25 +28,15 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
-var errInvalidLogicalExpression = errors.New("invalid logical expression")
-
-// GlobalIndexError represents a index rule is "global".
-// The local filter can't handle it.
-type GlobalIndexError struct {
-       IndexRule *databasev1.IndexRule
-       Expr      LiteralExpr
-}
-
-func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
-
-// BuildLocalFilter returns a new index.Filter for local indices.
-func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict 
map[string]int,
-       entity []*modelv1.TagValue, mandatoryIndexRule bool,
+// buildLocalFilter returns a new index.Filter for local indices.
+func buildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema,
+       entityDict map[string]int, entity []*modelv1.TagValue,
 ) (index.Filter, [][]*modelv1.TagValue, error) {
        if criteria == nil {
                return nil, [][]*modelv1.TagValue{entity}, nil
@@ -54,7 +44,7 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
        switch criteria.GetExp().(type) {
        case *modelv1.Criteria_Condition:
                cond := criteria.GetCondition()
-               expr, parsedEntity, err := parseExprOrEntity(entityDict, 
entity, cond)
+               expr, parsedEntity, err := 
logical.ParseExprOrEntity(entityDict, entity, cond)
                if err != nil {
                        return nil, nil, err
                }
@@ -62,31 +52,29 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                        return nil, parsedEntity, nil
                }
                if ok, indexRule := schema.IndexDefined(cond.Name); ok {
-                       return parseCondition(cond, indexRule, expr, entity)
-               } else if mandatoryIndexRule {
-                       return nil, nil, 
errors.Wrapf(errUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
+                       return parseConditionToFilter(cond, indexRule, expr, 
entity)
                }
                return ENode, [][]*modelv1.TagValue{entity}, nil
        case *modelv1.Criteria_Le:
                le := criteria.GetLe()
                if le.GetLeft() == nil && le.GetRight() == nil {
-                       return nil, nil, 
errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of 
[%v] are empty", criteria)
+                       return nil, nil, 
errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and 
right) of [%v] are empty", criteria)
                }
                if le.GetLeft() == nil {
-                       return BuildLocalFilter(le.Right, schema, entityDict, 
entity, mandatoryIndexRule)
+                       return buildLocalFilter(le.Right, schema, entityDict, 
entity)
                }
                if le.GetRight() == nil {
-                       return BuildLocalFilter(le.Left, schema, entityDict, 
entity, mandatoryIndexRule)
+                       return buildLocalFilter(le.Left, schema, entityDict, 
entity)
                }
-               left, leftEntities, err := BuildLocalFilter(le.Left, schema, 
entityDict, entity, mandatoryIndexRule)
+               left, leftEntities, err := buildLocalFilter(le.Left, schema, 
entityDict, entity)
                if err != nil {
                        return nil, nil, err
                }
-               right, rightEntities, err := BuildLocalFilter(le.Right, schema, 
entityDict, entity, mandatoryIndexRule)
+               right, rightEntities, err := buildLocalFilter(le.Right, schema, 
entityDict, entity)
                if err != nil {
                        return nil, nil, err
                }
-               entities := parseEntities(le.Op, entity, leftEntities, 
rightEntities)
+               entities := logical.ParseEntities(le.Op, entity, leftEntities, 
rightEntities)
                if entities == nil {
                        return nil, nil, nil
                }
@@ -110,10 +98,12 @@ func BuildLocalFilter(criteria *modelv1.Criteria, schema 
Schema, entityDict map[
                        return or, entities, nil
                }
        }
-       return nil, nil, errInvalidCriteriaType
+       return nil, nil, logical.ErrInvalidCriteriaType
 }
 
-func parseCondition(cond *modelv1.Condition, indexRule *databasev1.IndexRule, 
expr LiteralExpr, entity []*modelv1.TagValue) (index.Filter, 
[][]*modelv1.TagValue, error) {
+func parseConditionToFilter(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule,
+       expr logical.LiteralExpr, entity []*modelv1.TagValue,
+) (index.Filter, [][]*modelv1.TagValue, error) {
        switch cond.Op {
        case modelv1.Condition_BINARY_OP_GT:
                return newRange(indexRule, index.RangeOpts{
@@ -147,7 +137,7 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
                }
                and := newAnd(l)
                for _, b := range bb {
-                       and.append(newEq(indexRule, newBytesLiteral(b)))
+                       and.append(newEq(indexRule, logical.NewBytesLiteral(b)))
                }
                return and, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_HAVING:
@@ -158,7 +148,7 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
                }
                and := newAnd(l)
                for _, b := range bb {
-                       and.append(newEq(indexRule, newBytesLiteral(b)))
+                       and.append(newEq(indexRule, logical.NewBytesLiteral(b)))
                }
                return newNot(indexRule, and), [][]*modelv1.TagValue{entity}, 
nil
        case modelv1.Condition_BINARY_OP_IN:
@@ -169,7 +159,7 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
                }
                or := newOr(l)
                for _, b := range bb {
-                       or.append(newEq(indexRule, newBytesLiteral(b)))
+                       or.append(newEq(indexRule, logical.NewBytesLiteral(b)))
                }
                return or, [][]*modelv1.TagValue{entity}, nil
        case modelv1.Condition_BINARY_OP_NOT_IN:
@@ -180,149 +170,11 @@ func parseCondition(cond *modelv1.Condition, indexRule 
*databasev1.IndexRule, ex
                }
                or := newOr(l)
                for _, b := range bb {
-                       or.append(newEq(indexRule, newBytesLiteral(b)))
+                       or.append(newEq(indexRule, logical.NewBytesLiteral(b)))
                }
                return newNot(indexRule, or), [][]*modelv1.TagValue{entity}, nil
        }
-       return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, "index 
filter parses %v", cond)
-}
-
-func parseExprOrEntity(entityDict map[string]int, entity []*modelv1.TagValue, 
cond *modelv1.Condition) (LiteralExpr, [][]*modelv1.TagValue, error) {
-       entityIdx, ok := entityDict[cond.Name]
-       if ok && cond.Op != modelv1.Condition_BINARY_OP_EQ && cond.Op != 
modelv1.Condition_BINARY_OP_IN {
-               return nil, nil, errors.WithMessagef(errUnsupportedConditionOp, 
"tag belongs to the entity only supports EQ or IN operation in condition(%v)", 
cond)
-       }
-       switch v := cond.Value.Value.(type) {
-       case *modelv1.TagValue_Str:
-               if ok {
-                       parsedEntity := make([]*modelv1.TagValue, len(entity))
-                       copy(parsedEntity, entity)
-                       parsedEntity[entityIdx] = cond.Value
-                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
-               }
-               return str(v.Str.GetValue()), nil, nil
-       case *modelv1.TagValue_StrArray:
-               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
-                       entities := make([][]*modelv1.TagValue, 
len(v.StrArray.Value))
-                       for i, va := range v.StrArray.Value {
-                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
-                               copy(parsedEntity, entity)
-                               parsedEntity[entityIdx] = &modelv1.TagValue{
-                                       Value: &modelv1.TagValue_Str{
-                                               Str: &modelv1.Str{
-                                                       Value: va,
-                                               },
-                                       },
-                               }
-                               entities[i] = parsedEntity
-                       }
-                       return nil, entities, nil
-               }
-               return &strArrLiteral{
-                       arr: v.StrArray.GetValue(),
-               }, nil, nil
-       case *modelv1.TagValue_Int:
-               if ok {
-                       parsedEntity := make([]*modelv1.TagValue, len(entity))
-                       copy(parsedEntity, entity)
-                       parsedEntity[entityIdx] = cond.Value
-                       return nil, [][]*modelv1.TagValue{parsedEntity}, nil
-               }
-               return &int64Literal{
-                       int64: v.Int.GetValue(),
-               }, nil, nil
-       case *modelv1.TagValue_IntArray:
-               if ok && cond.Op == modelv1.Condition_BINARY_OP_IN {
-                       entities := make([][]*modelv1.TagValue, 
len(v.IntArray.Value))
-                       for i, va := range v.IntArray.Value {
-                               parsedEntity := make([]*modelv1.TagValue, 
len(entity))
-                               copy(parsedEntity, entity)
-                               parsedEntity[entityIdx] = &modelv1.TagValue{
-                                       Value: &modelv1.TagValue_Int{
-                                               Int: &modelv1.Int{
-                                                       Value: va,
-                                               },
-                                       },
-                               }
-                               entities[i] = parsedEntity
-                       }
-                       return nil, entities, nil
-               }
-               return &int64ArrLiteral{
-                       arr: v.IntArray.GetValue(),
-               }, nil, nil
-       case *modelv1.TagValue_Null:
-               return nullLiteralExpr, nil, nil
-       }
-       return nil, nil, errors.WithMessagef(errUnsupportedConditionValue, 
"index filter parses %v", cond)
-}
-
-func parseEntities(op modelv1.LogicalExpression_LogicalOp, input 
[]*modelv1.TagValue, left, right [][]*modelv1.TagValue) [][]*modelv1.TagValue {
-       count := len(input)
-       result := make([]*modelv1.TagValue, count)
-       anyEntity := func(entities [][]*modelv1.TagValue) bool {
-               for _, entity := range entities {
-                       for _, entry := range entity {
-                               if entry != pbv1.AnyTagValue {
-                                       return false
-                               }
-                       }
-               }
-               return true
-       }
-       leftAny := anyEntity(left)
-       rightAny := anyEntity(right)
-
-       mergedEntities := make([][]*modelv1.TagValue, 0, len(left)+len(right))
-
-       switch op {
-       case modelv1.LogicalExpression_LOGICAL_OP_AND:
-               if leftAny && !rightAny {
-                       return right
-               }
-               if !leftAny && rightAny {
-                       return left
-               }
-               mergedEntities = append(mergedEntities, left...)
-               mergedEntities = append(mergedEntities, right...)
-               for i := 0; i < count; i++ {
-                       entry := pbv1.AnyTagValue
-                       for j := 0; j < len(mergedEntities); j++ {
-                               e := mergedEntities[j][i]
-                               if e == pbv1.AnyTagValue {
-                                       continue
-                               }
-                               if entry == pbv1.AnyTagValue {
-                                       entry = e
-                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
-                                       return nil
-                               }
-                       }
-                       result[i] = entry
-               }
-       case modelv1.LogicalExpression_LOGICAL_OP_OR:
-               if leftAny {
-                       return left
-               }
-               if rightAny {
-                       return right
-               }
-               mergedEntities = append(mergedEntities, left...)
-               mergedEntities = append(mergedEntities, right...)
-               for i := 0; i < count; i++ {
-                       entry := pbv1.AnyTagValue
-                       for j := 0; j < len(mergedEntities); j++ {
-                               e := mergedEntities[j][i]
-                               if entry == pbv1.AnyTagValue {
-                                       entry = e
-                               } else if pbv1.MustCompareTagValue(entry, e) != 
0 {
-                                       return mergedEntities
-                               }
-                       }
-                       result[i] = entry
-               }
-       }
-       return [][]*modelv1.TagValue{result}
+       return nil, nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, 
"index filter parses %v", cond)
 }
 
 type fieldKey struct {
@@ -420,7 +272,7 @@ func (an *andNode) MarshalJSON() ([]byte, error) {
 }
 
 func (an *andNode) String() string {
-       return jsonToString(an)
+       return convert.JSONToString(an)
 }
 
 type orNode struct {
@@ -465,13 +317,13 @@ func (on *orNode) MarshalJSON() ([]byte, error) {
 }
 
 func (on *orNode) String() string {
-       return jsonToString(on)
+       return convert.JSONToString(on)
 }
 
 type leaf struct {
        index.Filter
        Key  fieldKey
-       Expr LiteralExpr
+       Expr logical.LiteralExpr
 }
 
 func (l *leaf) MarshalJSON() ([]byte, error) {
@@ -518,14 +370,14 @@ func (n *not) MarshalJSON() ([]byte, error) {
 }
 
 func (n *not) String() string {
-       return jsonToString(n)
+       return convert.JSONToString(n)
 }
 
 type eq struct {
        *leaf
 }
 
-func newEq(indexRule *databasev1.IndexRule, values LiteralExpr) *eq {
+func newEq(indexRule *databasev1.IndexRule, values logical.LiteralExpr) *eq {
        return &eq{
                leaf: &leaf{
                        Key:  newFieldKey(indexRule),
@@ -552,14 +404,14 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
 }
 
 func (eq *eq) String() string {
-       return jsonToString(eq)
+       return convert.JSONToString(eq)
 }
 
 type match struct {
        *leaf
 }
 
-func newMatch(indexRule *databasev1.IndexRule, values LiteralExpr) *match {
+func newMatch(indexRule *databasev1.IndexRule, values logical.LiteralExpr) 
*match {
        return &match{
                leaf: &leaf{
                        Key:  newFieldKey(indexRule),
@@ -591,7 +443,7 @@ func (match *match) MarshalJSON() ([]byte, error) {
 }
 
 func (match *match) String() string {
-       return jsonToString(match)
+       return convert.JSONToString(match)
 }
 
 type rangeOp struct {
@@ -642,15 +494,7 @@ func (r *rangeOp) MarshalJSON() ([]byte, error) {
 }
 
 func (r *rangeOp) String() string {
-       return jsonToString(r)
-}
-
-func jsonToString(marshaler json.Marshaler) string {
-       bb, err := marshaler.MarshalJSON()
-       if err != nil {
-               return err.Error()
-       }
-       return string(bb)
+       return convert.JSONToString(r)
 }
 
 var (
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go 
b/pkg/query/logical/stream/stream_plan_tag_filter.go
index a861dcfa..e7acf94e 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -55,7 +55,7 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) 
(logical.Plan, error)
                entity[idx] = pbv1.AnyTagValue
        }
        var err error
-       ctx.filter, ctx.entities, err = logical.BuildLocalFilter(uis.criteria, 
s, entityDict, entity, false)
+       ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s, 
entityDict, entity)
        if err != nil {
                return nil, err
        }
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index 0936e110..445ddfcc 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -25,6 +25,7 @@ import (
        "github.com/pkg/errors"
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
 var errUnsupportedLogicalOperation = errors.New("unsupported logical 
operation")
@@ -122,7 +123,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict 
map[string]int, index
                        return or, nil
                }
        }
-       return nil, errInvalidCriteriaType
+       return nil, ErrInvalidCriteriaType
 }
 
 func parseFilter(cond *modelv1.Condition, expr ComparableExpr) (TagFilter, 
error) {
@@ -158,7 +159,7 @@ func parseFilter(cond *modelv1.Condition, expr 
ComparableExpr) (TagFilter, error
        case modelv1.Condition_BINARY_OP_NOT_IN:
                return newNotTag(newInTag(cond.Name, expr)), nil
        default:
-               return nil, errors.WithMessagef(errUnsupportedConditionOp, "tag 
filter parses %v", cond)
+               return nil, errors.WithMessagef(ErrUnsupportedConditionOp, "tag 
filter parses %v", cond)
        }
 }
 
@@ -181,7 +182,7 @@ func parseExpr(value *modelv1.TagValue) (ComparableExpr, 
error) {
        case *modelv1.TagValue_Null:
                return nullLiteralExpr, nil
        }
-       return nil, errors.WithMessagef(errUnsupportedConditionValue, "tag 
filter parses %v", value)
+       return nil, errors.WithMessagef(ErrUnsupportedConditionValue, "tag 
filter parses %v", value)
 }
 
 // DummyFilter matches any predicate.
@@ -261,7 +262,7 @@ func (an *andLogicalNode) MarshalJSON() ([]byte, error) {
 }
 
 func (an *andLogicalNode) String() string {
-       return jsonToString(an)
+       return convert.JSONToString(an)
 }
 
 type orLogicalNode struct {
@@ -296,7 +297,7 @@ func (on *orLogicalNode) MarshalJSON() ([]byte, error) {
 }
 
 func (on *orLogicalNode) String() string {
-       return jsonToString(on)
+       return convert.JSONToString(on)
 }
 
 type tagLeaf struct {
@@ -338,7 +339,7 @@ func (n *notTag) MarshalJSON() ([]byte, error) {
 }
 
 func (n *notTag) String() string {
-       return jsonToString(n)
+       return convert.JSONToString(n)
 }
 
 type inTag struct {
@@ -390,7 +391,7 @@ func (eq *eqTag) MarshalJSON() ([]byte, error) {
 }
 
 func (eq *eqTag) String() string {
-       return jsonToString(eq)
+       return convert.JSONToString(eq)
 }
 
 type rangeOpts struct {
@@ -480,7 +481,7 @@ func (r *rangeTag) MarshalJSON() ([]byte, error) {
 }
 
 func (r *rangeTag) String() string {
-       return jsonToString(r)
+       return convert.JSONToString(r)
 }
 
 func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName 
string) (ComparableExpr, error) {
@@ -520,5 +521,5 @@ func (h *havingTag) MarshalJSON() ([]byte, error) {
 }
 
 func (h *havingTag) String() string {
-       return jsonToString(h)
+       return convert.JSONToString(h)
 }
diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 539508e4..20e98659 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -25,6 +25,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -72,7 +73,7 @@ const (
 
 // MeasureQueryOptions is the options of a measure query.
 type MeasureQueryOptions struct {
-       Filter          index.Filter
+       Query           *inverted.Query
        TimeRange       *timestamp.TimeRange
        Order           *OrderBy
        Name            string

Reply via email to