hanahmily commented on code in PR #491: URL: https://github.com/apache/skywalking-banyandb/pull/491#discussion_r1692361687
########## pkg/index/inverted/index_filter.go: ########## @@ -15,38 +15,129 @@ // specific language governing permissions and limitations // under the License. -package logical +package inverted import ( "bytes" "encoding/base64" "encoding/json" + "math" "strings" + "github.com/blugelabs/bluge" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -var errInvalidLogicalExpression = errors.New("invalid logical expression") +var ( + minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0]) + maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0]) + errInvalidLogicalExpression = errors.New("invalid logical expression") +) // GlobalIndexError represents a index rule is "global". // The local filter can't handle it. type GlobalIndexError struct { IndexRule *databasev1.IndexRule - Expr LiteralExpr + Expr logical.LiteralExpr } func (g GlobalIndexError) Error() string { return g.IndexRule.String() } +// BlugeQuery is a wrapper for bluge.Query. +type BlugeQuery struct { Review Comment: ```suggestion type Query struct { ``` ########## pkg/index/inverted/index_filter.go: ########## @@ -15,38 +15,129 @@ // specific language governing permissions and limitations // under the License. -package logical +package inverted import ( "bytes" "encoding/base64" "encoding/json" + "math" "strings" + "github.com/blugelabs/bluge" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -var errInvalidLogicalExpression = errors.New("invalid logical expression") +var ( + minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0]) + maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0]) + errInvalidLogicalExpression = errors.New("invalid logical expression") +) // GlobalIndexError represents a index rule is "global". // The local filter can't handle it. type GlobalIndexError struct { IndexRule *databasev1.IndexRule - Expr LiteralExpr + Expr logical.LiteralExpr } func (g GlobalIndexError) Error() string { return g.IndexRule.String() } +// BlugeQuery is a wrapper for bluge.Query. +type BlugeQuery struct { + bluge.Query +} + +// BuildLocalQuery returns blugeQuery for local indices. +func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int, + entity []*modelv1.TagValue, +) (*BlugeQuery, [][]*modelv1.TagValue, bool, error) { + if criteria == nil { + return nil, [][]*modelv1.TagValue{entity}, false, nil + } + switch criteria.GetExp().(type) { + case *modelv1.Criteria_Condition: + cond := criteria.GetCondition() + expr, parsedEntity, err := parseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, false, err + } + if parsedEntity != nil { + return nil, parsedEntity, false, nil + } + if ok, indexRule := schema.IndexDefined(cond.Name); ok { + return parseConditionToQuery(cond, indexRule, expr, entity) + } + return nil, nil, false, errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s", cond) + case *modelv1.Criteria_Le: + le := criteria.GetLe() + if le.GetLeft() == nil && le.GetRight() == nil { + return nil, nil, false, errors.WithMessagef(errInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) + } + if le.GetLeft() == nil { + return BuildLocalQuery(le.Right, schema, entityDict, entity) + } + if le.GetRight() == nil { + return BuildLocalQuery(le.Left, schema, entityDict, entity) + } + left, leftEntities, leftIsMatchAllQuery, err := BuildLocalQuery(le.Left, schema, entityDict, entity) + if err != nil { + return nil, nil, false, err + } + right, rightEntities, rightIsMatchAllQuery, err := BuildLocalQuery(le.Right, schema, entityDict, entity) + if err != nil { + return nil, nil, false, err + } + entities := parseEntities(le.Op, entity, leftEntities, rightEntities) + if entities == nil { + return nil, nil, false, nil + } + if left == nil && right == nil { + return nil, entities, false, nil + } + if leftIsMatchAllQuery && rightIsMatchAllQuery { + return &BlugeQuery{bluge.NewMatchAllQuery()}, entities, true, nil + } + switch le.Op { + case modelv1.LogicalExpression_LOGICAL_OP_AND: + query := bluge.NewBooleanQuery() + if left != nil { + query.AddMust(left.Query) + } + if right != nil { + query.AddMust(right.Query) + } + return &BlugeQuery{query}, entities, false, nil + case modelv1.LogicalExpression_LOGICAL_OP_OR: + if leftIsMatchAllQuery || rightIsMatchAllQuery { + return &BlugeQuery{bluge.NewMatchAllQuery()}, entities, true, nil + } + query := bluge.NewBooleanQuery() + query.SetMinShould(1) + if left != nil { + query.AddShould(left.Query) + } + if right != nil { + query.AddShould(right.Query) + } + return &BlugeQuery{query}, entities, false, nil + } + } + return nil, nil, false, logical.ErrInvalidCriteriaType +} + // BuildLocalFilter returns a new index.Filter for local indices. -func BuildLocalFilter(criteria *modelv1.Criteria, schema Schema, entityDict map[string]int, - entity []*modelv1.TagValue, mandatoryIndexRule bool, +func BuildLocalFilter(criteria *modelv1.Criteria, schema logical.Schema, Review Comment: Why do you move the filter here? Its implementation doesn't relate to bulge. It should be in the `pkg/query/logical/stream` ########## pkg/index/inverted/index_filter.go: ########## Review Comment: Rename the file name to `query.go` ########## pkg/index/index.go: ########## @@ -188,6 +190,8 @@ type SeriesStore interface { Store // Search returns a list of series that match the given matchers. Search(context.Context, []SeriesMatcher) ([]Series, error) + // Execute returns a posting list that matches the given query. + Execute(bluge.Query) (posting.List, error) Review Comment: Please encapture the `bluge` in the inverted pkg. ########## banyand/internal/storage/storage.go: ########## @@ -67,7 +68,7 @@ type SupplyTSDB[T TSTable] func() T // IndexDB is the interface of index database. type IndexDB interface { Write(docs index.Documents) error - Search(ctx context.Context, series []*pbv1.Series, filter index.Filter, order *model.OrderBy, preloadSize int) (pbv1.SeriesList, error) + Search(ctx context.Context, series []*pbv1.Series, query *inverted.BlugeQuery, filter index.Filter, order *model.OrderBy, preloadSize int) (pbv1.SeriesList, error) Review Comment: Is the `filter` still necessary? ########## pkg/index/inverted/index_filter.go: ########## @@ -15,38 +15,129 @@ // specific language governing permissions and limitations // under the License. -package logical +package inverted import ( "bytes" "encoding/base64" "encoding/json" + "math" "strings" + "github.com/blugelabs/bluge" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/posting" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) -var errInvalidLogicalExpression = errors.New("invalid logical expression") +var ( + minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0]) + maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0]) + errInvalidLogicalExpression = errors.New("invalid logical expression") +) // GlobalIndexError represents a index rule is "global". // The local filter can't handle it. type GlobalIndexError struct { IndexRule *databasev1.IndexRule - Expr LiteralExpr + Expr logical.LiteralExpr } func (g GlobalIndexError) Error() string { return g.IndexRule.String() } +// BlugeQuery is a wrapper for bluge.Query. +type BlugeQuery struct { Review Comment: Add `ToString` to this struct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org