This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch query-criteria in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit eb2f07457a7eb264e832267add0a5e551aad498c Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Sep 2 01:18:12 2022 +0000 Need to be squahsed Add index_filter and tag_filter Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- api/proto/banyandb/measure/v1/query.pb.go | 6 +- api/proto/banyandb/measure/v1/query.pb.validate.go | 47 +- api/proto/banyandb/measure/v1/query.proto | 2 +- api/proto/banyandb/model/v1/query.pb.go | 364 +++++++++---- api/proto/banyandb/model/v1/query.pb.validate.go | 208 ++++++- api/proto/banyandb/model/v1/query.proto | 19 +- api/proto/banyandb/stream/v1/query.pb.go | 6 +- api/proto/banyandb/stream/v1/query.pb.validate.go | 47 +- api/proto/banyandb/stream/v1/query.proto | 2 +- .../openapi/banyandb/database/v1/rpc.swagger.json | 36 +- .../openapi/banyandb/measure/v1/rpc.swagger.json | 41 +- .../openapi/banyandb/stream/v1/rpc.swagger.json | 41 +- banyand/tsdb/series_seek.go | 15 +- banyand/tsdb/series_seek_filter.go | 106 +--- banyand/tsdb/series_seek_sort.go | 14 +- banyand/tsdb/seriesdb.go | 14 + docs/api-reference.md | 40 +- pkg/index/index.go | 9 +- pkg/index/inverted/inverted_test.go | 1 - pkg/index/testcases/service_name.go | 1 - pkg/index/tree.go | 367 ------------- pkg/query/logical/expr_literal.go | 169 +++--- pkg/query/logical/index_filter.go | 603 +++++++++++++++++++++ pkg/query/logical/interface.go | 23 +- pkg/query/logical/measure_analyzer.go | 52 +- pkg/query/logical/measure_plan_indexscan_local.go | 48 +- pkg/query/logical/schema.go | 14 +- pkg/query/logical/stream_analyzer.go | 56 +- pkg/query/logical/stream_plan.go | 30 - pkg/query/logical/stream_plan_indexscan_global.go | 22 - pkg/query/logical/stream_plan_indexscan_local.go | 145 ++--- pkg/query/logical/stream_plan_tag_filter.go | 188 ++----- pkg/query/logical/tag_filter.go | 417 ++++++++++++++ 33 files changed, 1941 insertions(+), 1212 deletions(-) diff --git a/api/proto/banyandb/measure/v1/query.pb.go b/api/proto/banyandb/measure/v1/query.pb.go index a602375..a016cf8 100644 --- a/api/proto/banyandb/measure/v1/query.pb.go +++ b/api/proto/banyandb/measure/v1/query.pb.go @@ -168,7 +168,7 @@ type QueryRequest struct { // time_range is a range query with begin/end time of entities in the timeunit of milliseconds. TimeRange *v1.TimeRange `protobuf:"bytes,2,opt,name=time_range,json=timeRange,proto3" json:"time_range,omitempty"` // tag_families are indexed. - Criteria []*v1.Criteria `protobuf:"bytes,4,rep,name=criteria,proto3" json:"criteria,omitempty"` + Criteria *v1.Criteria `protobuf:"bytes,4,opt,name=criteria,proto3" json:"criteria,omitempty"` // tag_projection can be used to select tags of the data points in the response TagProjection *v1.TagProjection `protobuf:"bytes,5,opt,name=tag_projection,json=tagProjection,proto3" json:"tag_projection,omitempty"` // field_projection can be used to select fields of the data points in the response @@ -236,7 +236,7 @@ func (x *QueryRequest) GetTimeRange() *v1.TimeRange { return nil } -func (x *QueryRequest) GetCriteria() []*v1.Criteria { +func (x *QueryRequest) GetCriteria() *v1.Criteria { if x != nil { return x.Criteria } @@ -632,7 +632,7 @@ var file_banyandb_measure_v1_query_proto_rawDesc = []byte{ 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x8a, 0x01, 0x02, 0x10, 0x01, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x63, 0x72, 0x69, 0x74, 0x65, - 0x72, 0x69, 0x61, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, + 0x72, 0x69, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x52, 0x08, 0x63, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x12, 0x51, 0x0a, 0x0e, 0x74, 0x61, 0x67, 0x5f, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, diff --git a/api/proto/banyandb/measure/v1/query.pb.validate.go b/api/proto/banyandb/measure/v1/query.pb.validate.go index cbcf0be..cf65e2f 100644 --- a/api/proto/banyandb/measure/v1/query.pb.validate.go +++ b/api/proto/banyandb/measure/v1/query.pb.validate.go @@ -467,38 +467,33 @@ func (m *QueryRequest) validate(all bool) error { } } - for idx, item := range m.GetCriteria() { - _, _ = idx, item - - if all { - switch v := interface{}(item).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - case interface{ Validate() error }: - if err := v.Validate(); err != nil { - errors = append(errors, QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } + if all { + switch v := interface{}(m.GetCriteria()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, QueryRequestValidationError{ + field: "Criteria", + reason: "embedded message failed validation", + cause: err, + }) } - } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { + case interface{ Validate() error }: if err := v.Validate(); err != nil { - return QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), + errors = append(errors, QueryRequestValidationError{ + field: "Criteria", reason: "embedded message failed validation", cause: err, - } + }) + } + } + } else if v, ok := interface{}(m.GetCriteria()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return QueryRequestValidationError{ + field: "Criteria", + reason: "embedded message failed validation", + cause: err, } } - } if m.GetTagProjection() == nil { diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto index 499040e..527b6ee 100644 --- a/api/proto/banyandb/measure/v1/query.proto +++ b/api/proto/banyandb/measure/v1/query.proto @@ -55,7 +55,7 @@ message QueryRequest { // time_range is a range query with begin/end time of entities in the timeunit of milliseconds. model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true]; // tag_families are indexed. - repeated model.v1.Criteria criteria = 4; + model.v1.Criteria criteria = 4; // tag_projection can be used to select tags of the data points in the response model.v1.TagProjection tag_projection = 5 [(validate.rules).message.required = true]; message FieldProjection { diff --git a/api/proto/banyandb/model/v1/query.pb.go b/api/proto/banyandb/model/v1/query.pb.go index e4248ed..453cce5 100644 --- a/api/proto/banyandb/model/v1/query.pb.go +++ b/api/proto/banyandb/model/v1/query.pb.go @@ -171,6 +171,55 @@ func (Condition_BinaryOp) EnumDescriptor() ([]byte, []int) { return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{2, 0} } +type LogicalExpression_LogicalOp int32 + +const ( + LogicalExpression_LOGICAL_OP_UNSPECIFIED LogicalExpression_LogicalOp = 0 + LogicalExpression_LOGICAL_OP_AND LogicalExpression_LogicalOp = 1 + LogicalExpression_LOGICAL_OP_OR LogicalExpression_LogicalOp = 2 +) + +// Enum value maps for LogicalExpression_LogicalOp. +var ( + LogicalExpression_LogicalOp_name = map[int32]string{ + 0: "LOGICAL_OP_UNSPECIFIED", + 1: "LOGICAL_OP_AND", + 2: "LOGICAL_OP_OR", + } + LogicalExpression_LogicalOp_value = map[string]int32{ + "LOGICAL_OP_UNSPECIFIED": 0, + "LOGICAL_OP_AND": 1, + "LOGICAL_OP_OR": 2, + } +) + +func (x LogicalExpression_LogicalOp) Enum() *LogicalExpression_LogicalOp { + p := new(LogicalExpression_LogicalOp) + *p = x + return p +} + +func (x LogicalExpression_LogicalOp) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (LogicalExpression_LogicalOp) Descriptor() protoreflect.EnumDescriptor { + return file_banyandb_model_v1_query_proto_enumTypes[2].Descriptor() +} + +func (LogicalExpression_LogicalOp) Type() protoreflect.EnumType { + return &file_banyandb_model_v1_query_proto_enumTypes[2] +} + +func (x LogicalExpression_LogicalOp) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use LogicalExpression_LogicalOp.Descriptor instead. +func (LogicalExpression_LogicalOp) EnumDescriptor() ([]byte, []int) { + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{4, 0} +} + // Pair is the building block of a record which is equivalent to a key-value pair. // In the context of Trace, it could be metadata of a trace such as service_name, service_instance, etc. // Besides, other tags are organized in key-value pair in the underlying storage layer. @@ -357,8 +406,11 @@ type Criteria struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TagFamilyName string `protobuf:"bytes,1,opt,name=tag_family_name,json=tagFamilyName,proto3" json:"tag_family_name,omitempty"` - Conditions []*Condition `protobuf:"bytes,2,rep,name=conditions,proto3" json:"conditions,omitempty"` + // Types that are assignable to Exp: + // + // *Criteria_Le + // *Criteria_Condition + Exp isCriteria_Exp `protobuf_oneof:"exp"` } func (x *Criteria) Reset() { @@ -393,16 +445,104 @@ func (*Criteria) Descriptor() ([]byte, []int) { return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{3} } -func (x *Criteria) GetTagFamilyName() string { +func (m *Criteria) GetExp() isCriteria_Exp { + if m != nil { + return m.Exp + } + return nil +} + +func (x *Criteria) GetLe() *LogicalExpression { + if x, ok := x.GetExp().(*Criteria_Le); ok { + return x.Le + } + return nil +} + +func (x *Criteria) GetCondition() *Condition { + if x, ok := x.GetExp().(*Criteria_Condition); ok { + return x.Condition + } + return nil +} + +type isCriteria_Exp interface { + isCriteria_Exp() +} + +type Criteria_Le struct { + Le *LogicalExpression `protobuf:"bytes,1,opt,name=le,proto3,oneof"` +} + +type Criteria_Condition struct { + Condition *Condition `protobuf:"bytes,2,opt,name=condition,proto3,oneof"` +} + +func (*Criteria_Le) isCriteria_Exp() {} + +func (*Criteria_Condition) isCriteria_Exp() {} + +// LogicalExpression supports logical operation +type LogicalExpression struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // op is a logial operation + Op LogicalExpression_LogicalOp `protobuf:"varint,1,opt,name=op,proto3,enum=banyandb.model.v1.LogicalExpression_LogicalOp" json:"op,omitempty"` + Left *Criteria `protobuf:"bytes,2,opt,name=left,proto3" json:"left,omitempty"` + Right *Criteria `protobuf:"bytes,3,opt,name=right,proto3" json:"right,omitempty"` +} + +func (x *LogicalExpression) Reset() { + *x = LogicalExpression{} + if protoimpl.UnsafeEnabled { + mi := &file_banyandb_model_v1_query_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogicalExpression) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogicalExpression) ProtoMessage() {} + +func (x *LogicalExpression) ProtoReflect() protoreflect.Message { + mi := &file_banyandb_model_v1_query_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogicalExpression.ProtoReflect.Descriptor instead. +func (*LogicalExpression) Descriptor() ([]byte, []int) { + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{4} +} + +func (x *LogicalExpression) GetOp() LogicalExpression_LogicalOp { if x != nil { - return x.TagFamilyName + return x.Op } - return "" + return LogicalExpression_LOGICAL_OP_UNSPECIFIED +} + +func (x *LogicalExpression) GetLeft() *Criteria { + if x != nil { + return x.Left + } + return nil } -func (x *Criteria) GetConditions() []*Condition { +func (x *LogicalExpression) GetRight() *Criteria { if x != nil { - return x.Conditions + return x.Right } return nil } @@ -421,7 +561,7 @@ type QueryOrder struct { func (x *QueryOrder) Reset() { *x = QueryOrder{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_model_v1_query_proto_msgTypes[4] + mi := &file_banyandb_model_v1_query_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -434,7 +574,7 @@ func (x *QueryOrder) String() string { func (*QueryOrder) ProtoMessage() {} func (x *QueryOrder) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_model_v1_query_proto_msgTypes[4] + mi := &file_banyandb_model_v1_query_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -447,7 +587,7 @@ func (x *QueryOrder) ProtoReflect() protoreflect.Message { // Deprecated: Use QueryOrder.ProtoReflect.Descriptor instead. func (*QueryOrder) Descriptor() ([]byte, []int) { - return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{4} + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{5} } func (x *QueryOrder) GetIndexRuleName() string { @@ -476,7 +616,7 @@ type TagProjection struct { func (x *TagProjection) Reset() { *x = TagProjection{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_model_v1_query_proto_msgTypes[5] + mi := &file_banyandb_model_v1_query_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -489,7 +629,7 @@ func (x *TagProjection) String() string { func (*TagProjection) ProtoMessage() {} func (x *TagProjection) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_model_v1_query_proto_msgTypes[5] + mi := &file_banyandb_model_v1_query_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -502,7 +642,7 @@ func (x *TagProjection) ProtoReflect() protoreflect.Message { // Deprecated: Use TagProjection.ProtoReflect.Descriptor instead. func (*TagProjection) Descriptor() ([]byte, []int) { - return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{5} + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{6} } func (x *TagProjection) GetTagFamilies() []*TagProjection_TagFamily { @@ -526,7 +666,7 @@ type TimeRange struct { func (x *TimeRange) Reset() { *x = TimeRange{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_model_v1_query_proto_msgTypes[6] + mi := &file_banyandb_model_v1_query_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -539,7 +679,7 @@ func (x *TimeRange) String() string { func (*TimeRange) ProtoMessage() {} func (x *TimeRange) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_model_v1_query_proto_msgTypes[6] + mi := &file_banyandb_model_v1_query_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -552,7 +692,7 @@ func (x *TimeRange) ProtoReflect() protoreflect.Message { // Deprecated: Use TimeRange.ProtoReflect.Descriptor instead. func (*TimeRange) Descriptor() ([]byte, []int) { - return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{6} + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{7} } func (x *TimeRange) GetBegin() *timestamppb.Timestamp { @@ -581,7 +721,7 @@ type TagProjection_TagFamily struct { func (x *TagProjection_TagFamily) Reset() { *x = TagProjection_TagFamily{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_model_v1_query_proto_msgTypes[7] + mi := &file_banyandb_model_v1_query_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -594,7 +734,7 @@ func (x *TagProjection_TagFamily) String() string { func (*TagProjection_TagFamily) ProtoMessage() {} func (x *TagProjection_TagFamily) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_model_v1_query_proto_msgTypes[7] + mi := &file_banyandb_model_v1_query_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -607,7 +747,7 @@ func (x *TagProjection_TagFamily) ProtoReflect() protoreflect.Message { // Deprecated: Use TagProjection_TagFamily.ProtoReflect.Descriptor instead. func (*TagProjection_TagFamily) Descriptor() ([]byte, []int) { - return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{5, 0} + return file_banyandb_model_v1_query_proto_rawDescGZIP(), []int{6, 0} } func (x *TagProjection_TagFamily) GetName() string { @@ -670,48 +810,66 @@ var file_banyandb_model_v1_query_proto_rawDesc = []byte{ 0x4f, 0x50, 0x5f, 0x49, 0x4e, 0x10, 0x09, 0x12, 0x14, 0x0a, 0x10, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x49, 0x4e, 0x10, 0x0a, 0x12, 0x13, 0x0a, 0x0f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x5f, 0x4f, 0x50, 0x5f, 0x4d, 0x41, 0x54, 0x43, 0x48, - 0x10, 0x0b, 0x22, 0x70, 0x0a, 0x08, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x12, 0x26, - 0x0a, 0x0f, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, - 0x6c, 0x79, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x3c, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, + 0x10, 0x0b, 0x22, 0x87, 0x01, 0x0a, 0x08, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x12, + 0x36, 0x0a, 0x02, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x62, 0x61, + 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, + 0x4c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x45, 0x78, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x48, 0x00, 0x52, 0x02, 0x6c, 0x65, 0x12, 0x3c, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x43, - 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x61, 0x0a, 0x0a, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, - 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x72, 0x75, 0x6c, 0x65, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x69, 0x6e, 0x64, - 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x04, 0x73, 0x6f, - 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, - 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x72, - 0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x9d, 0x01, 0x0a, 0x0d, 0x54, 0x61, 0x67, 0x50, - 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x57, 0x0a, 0x0c, 0x74, 0x61, 0x67, - 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, - 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x67, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x42, 0x08, 0xfa, 0x42, 0x05, - 0x92, 0x01, 0x02, 0x08, 0x01, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, - 0x65, 0x73, 0x1a, 0x33, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, - 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, - 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, 0x65, 0x52, - 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, - 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, - 0x03, 0x65, 0x6e, 0x64, 0x2a, 0x39, 0x0a, 0x04, 0x53, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x10, - 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, - 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x44, 0x45, 0x53, 0x43, 0x10, - 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x41, 0x53, 0x43, 0x10, 0x02, 0x42, - 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, - 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, - 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, - 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, - 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, - 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x05, 0x0a, 0x03, 0x65, 0x78, 0x70, 0x22, 0x87, 0x02, 0x0a, + 0x11, 0x4c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x45, 0x78, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x02, 0x6f, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2e, + 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, + 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x45, 0x78, 0x70, 0x72, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x4c, 0x6f, 0x67, 0x69, 0x63, 0x61, 0x6c, 0x4f, 0x70, 0x52, 0x02, + 0x6f, 0x70, 0x12, 0x2f, 0x0a, 0x04, 0x6c, 0x65, 0x66, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, + 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x52, 0x04, 0x6c, + 0x65, 0x66, 0x74, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x69, 0x67, 0x68, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, + 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x52, + 0x05, 0x72, 0x69, 0x67, 0x68, 0x74, 0x22, 0x4e, 0x0a, 0x09, 0x4c, 0x6f, 0x67, 0x69, 0x63, 0x61, + 0x6c, 0x4f, 0x70, 0x12, 0x1a, 0x0a, 0x16, 0x4c, 0x4f, 0x47, 0x49, 0x43, 0x41, 0x4c, 0x5f, 0x4f, + 0x50, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x12, 0x0a, 0x0e, 0x4c, 0x4f, 0x47, 0x49, 0x43, 0x41, 0x4c, 0x5f, 0x4f, 0x50, 0x5f, 0x41, 0x4e, + 0x44, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x4c, 0x4f, 0x47, 0x49, 0x43, 0x41, 0x4c, 0x5f, 0x4f, + 0x50, 0x5f, 0x4f, 0x52, 0x10, 0x02, 0x22, 0x61, 0x0a, 0x0a, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, + 0x72, 0x64, 0x65, 0x72, 0x12, 0x26, 0x0a, 0x0f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x72, 0x75, + 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x69, + 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2b, 0x0a, 0x04, + 0x73, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x62, 0x61, 0x6e, + 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x53, + 0x6f, 0x72, 0x74, 0x52, 0x04, 0x73, 0x6f, 0x72, 0x74, 0x22, 0x9d, 0x01, 0x0a, 0x0d, 0x54, 0x61, + 0x67, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x57, 0x0a, 0x0c, 0x74, + 0x61, 0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x2a, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, + 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x67, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x42, 0x08, 0xfa, + 0x42, 0x05, 0x92, 0x01, 0x02, 0x08, 0x01, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, + 0x6c, 0x69, 0x65, 0x73, 0x1a, 0x33, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, + 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x6b, 0x0a, 0x09, 0x54, 0x69, 0x6d, + 0x65, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x05, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x2a, 0x39, 0x0a, 0x04, 0x53, 0x6f, 0x72, 0x74, 0x12, 0x14, + 0x0a, 0x10, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x44, 0x45, 0x53, + 0x43, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x4f, 0x52, 0x54, 0x5f, 0x41, 0x53, 0x43, 0x10, + 0x02, 0x42, 0x6c, 0x0a, 0x27, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, + 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, + 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x5a, 0x41, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, + 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, + 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, + 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2f, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -726,37 +884,43 @@ func file_banyandb_model_v1_query_proto_rawDescGZIP() []byte { return file_banyandb_model_v1_query_proto_rawDescData } -var file_banyandb_model_v1_query_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_banyandb_model_v1_query_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_banyandb_model_v1_query_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_banyandb_model_v1_query_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_banyandb_model_v1_query_proto_goTypes = []interface{}{ - (Sort)(0), // 0: banyandb.model.v1.Sort - (Condition_BinaryOp)(0), // 1: banyandb.model.v1.Condition.BinaryOp - (*Tag)(nil), // 2: banyandb.model.v1.Tag - (*TagFamily)(nil), // 3: banyandb.model.v1.TagFamily - (*Condition)(nil), // 4: banyandb.model.v1.Condition - (*Criteria)(nil), // 5: banyandb.model.v1.Criteria - (*QueryOrder)(nil), // 6: banyandb.model.v1.QueryOrder - (*TagProjection)(nil), // 7: banyandb.model.v1.TagProjection - (*TimeRange)(nil), // 8: banyandb.model.v1.TimeRange - (*TagProjection_TagFamily)(nil), // 9: banyandb.model.v1.TagProjection.TagFamily - (*TagValue)(nil), // 10: banyandb.model.v1.TagValue - (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp + (Sort)(0), // 0: banyandb.model.v1.Sort + (Condition_BinaryOp)(0), // 1: banyandb.model.v1.Condition.BinaryOp + (LogicalExpression_LogicalOp)(0), // 2: banyandb.model.v1.LogicalExpression.LogicalOp + (*Tag)(nil), // 3: banyandb.model.v1.Tag + (*TagFamily)(nil), // 4: banyandb.model.v1.TagFamily + (*Condition)(nil), // 5: banyandb.model.v1.Condition + (*Criteria)(nil), // 6: banyandb.model.v1.Criteria + (*LogicalExpression)(nil), // 7: banyandb.model.v1.LogicalExpression + (*QueryOrder)(nil), // 8: banyandb.model.v1.QueryOrder + (*TagProjection)(nil), // 9: banyandb.model.v1.TagProjection + (*TimeRange)(nil), // 10: banyandb.model.v1.TimeRange + (*TagProjection_TagFamily)(nil), // 11: banyandb.model.v1.TagProjection.TagFamily + (*TagValue)(nil), // 12: banyandb.model.v1.TagValue + (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp } var file_banyandb_model_v1_query_proto_depIdxs = []int32{ - 10, // 0: banyandb.model.v1.Tag.value:type_name -> banyandb.model.v1.TagValue - 2, // 1: banyandb.model.v1.TagFamily.tags:type_name -> banyandb.model.v1.Tag + 12, // 0: banyandb.model.v1.Tag.value:type_name -> banyandb.model.v1.TagValue + 3, // 1: banyandb.model.v1.TagFamily.tags:type_name -> banyandb.model.v1.Tag 1, // 2: banyandb.model.v1.Condition.op:type_name -> banyandb.model.v1.Condition.BinaryOp - 10, // 3: banyandb.model.v1.Condition.value:type_name -> banyandb.model.v1.TagValue - 4, // 4: banyandb.model.v1.Criteria.conditions:type_name -> banyandb.model.v1.Condition - 0, // 5: banyandb.model.v1.QueryOrder.sort:type_name -> banyandb.model.v1.Sort - 9, // 6: banyandb.model.v1.TagProjection.tag_families:type_name -> banyandb.model.v1.TagProjection.TagFamily - 11, // 7: banyandb.model.v1.TimeRange.begin:type_name -> google.protobuf.Timestamp - 11, // 8: banyandb.model.v1.TimeRange.end:type_name -> google.protobuf.Timestamp - 9, // [9:9] is the sub-list for method output_type - 9, // [9:9] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 12, // 3: banyandb.model.v1.Condition.value:type_name -> banyandb.model.v1.TagValue + 7, // 4: banyandb.model.v1.Criteria.le:type_name -> banyandb.model.v1.LogicalExpression + 5, // 5: banyandb.model.v1.Criteria.condition:type_name -> banyandb.model.v1.Condition + 2, // 6: banyandb.model.v1.LogicalExpression.op:type_name -> banyandb.model.v1.LogicalExpression.LogicalOp + 6, // 7: banyandb.model.v1.LogicalExpression.left:type_name -> banyandb.model.v1.Criteria + 6, // 8: banyandb.model.v1.LogicalExpression.right:type_name -> banyandb.model.v1.Criteria + 0, // 9: banyandb.model.v1.QueryOrder.sort:type_name -> banyandb.model.v1.Sort + 11, // 10: banyandb.model.v1.TagProjection.tag_families:type_name -> banyandb.model.v1.TagProjection.TagFamily + 13, // 11: banyandb.model.v1.TimeRange.begin:type_name -> google.protobuf.Timestamp + 13, // 12: banyandb.model.v1.TimeRange.end:type_name -> google.protobuf.Timestamp + 13, // [13:13] is the sub-list for method output_type + 13, // [13:13] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name } func init() { file_banyandb_model_v1_query_proto_init() } @@ -815,7 +979,7 @@ func file_banyandb_model_v1_query_proto_init() { } } file_banyandb_model_v1_query_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*QueryOrder); i { + switch v := v.(*LogicalExpression); i { case 0: return &v.state case 1: @@ -827,7 +991,7 @@ func file_banyandb_model_v1_query_proto_init() { } } file_banyandb_model_v1_query_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TagProjection); i { + switch v := v.(*QueryOrder); i { case 0: return &v.state case 1: @@ -839,7 +1003,7 @@ func file_banyandb_model_v1_query_proto_init() { } } file_banyandb_model_v1_query_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TimeRange); i { + switch v := v.(*TagProjection); i { case 0: return &v.state case 1: @@ -851,6 +1015,18 @@ func file_banyandb_model_v1_query_proto_init() { } } file_banyandb_model_v1_query_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TimeRange); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_banyandb_model_v1_query_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TagProjection_TagFamily); i { case 0: return &v.state @@ -863,13 +1039,17 @@ func file_banyandb_model_v1_query_proto_init() { } } } + file_banyandb_model_v1_query_proto_msgTypes[3].OneofWrappers = []interface{}{ + (*Criteria_Le)(nil), + (*Criteria_Condition)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_banyandb_model_v1_query_proto_rawDesc, - NumEnums: 2, - NumMessages: 8, + NumEnums: 3, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/api/proto/banyandb/model/v1/query.pb.validate.go b/api/proto/banyandb/model/v1/query.pb.validate.go index 0463597..2cd1d73 100644 --- a/api/proto/banyandb/model/v1/query.pb.validate.go +++ b/api/proto/banyandb/model/v1/query.pb.validate.go @@ -453,17 +453,16 @@ func (m *Criteria) validate(all bool) error { var errors []error - // no validation rules for TagFamilyName + switch m.Exp.(type) { - for idx, item := range m.GetConditions() { - _, _ = idx, item + case *Criteria_Le: if all { - switch v := interface{}(item).(type) { + switch v := interface{}(m.GetLe()).(type) { case interface{ ValidateAll() error }: if err := v.ValidateAll(); err != nil { errors = append(errors, CriteriaValidationError{ - field: fmt.Sprintf("Conditions[%v]", idx), + field: "Le", reason: "embedded message failed validation", cause: err, }) @@ -471,16 +470,47 @@ func (m *Criteria) validate(all bool) error { case interface{ Validate() error }: if err := v.Validate(); err != nil { errors = append(errors, CriteriaValidationError{ - field: fmt.Sprintf("Conditions[%v]", idx), + field: "Le", reason: "embedded message failed validation", cause: err, }) } } - } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { + } else if v, ok := interface{}(m.GetLe()).(interface{ Validate() error }); ok { if err := v.Validate(); err != nil { return CriteriaValidationError{ - field: fmt.Sprintf("Conditions[%v]", idx), + field: "Le", + reason: "embedded message failed validation", + cause: err, + } + } + } + + case *Criteria_Condition: + + if all { + switch v := interface{}(m.GetCondition()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, CriteriaValidationError{ + field: "Condition", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, CriteriaValidationError{ + field: "Condition", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetCondition()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return CriteriaValidationError{ + field: "Condition", reason: "embedded message failed validation", cause: err, } @@ -566,6 +596,168 @@ var _ interface { ErrorName() string } = CriteriaValidationError{} +// Validate checks the field values on LogicalExpression with the rules defined +// in the proto definition for this message. If any rules are violated, the +// first error encountered is returned, or nil if there are no violations. +func (m *LogicalExpression) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on LogicalExpression with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// LogicalExpressionMultiError, or nil if none found. +func (m *LogicalExpression) ValidateAll() error { + return m.validate(true) +} + +func (m *LogicalExpression) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + // no validation rules for Op + + if all { + switch v := interface{}(m.GetLeft()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, LogicalExpressionValidationError{ + field: "Left", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, LogicalExpressionValidationError{ + field: "Left", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetLeft()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return LogicalExpressionValidationError{ + field: "Left", + reason: "embedded message failed validation", + cause: err, + } + } + } + + if all { + switch v := interface{}(m.GetRight()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, LogicalExpressionValidationError{ + field: "Right", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, LogicalExpressionValidationError{ + field: "Right", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetRight()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return LogicalExpressionValidationError{ + field: "Right", + reason: "embedded message failed validation", + cause: err, + } + } + } + + if len(errors) > 0 { + return LogicalExpressionMultiError(errors) + } + + return nil +} + +// LogicalExpressionMultiError is an error wrapping multiple validation errors +// returned by LogicalExpression.ValidateAll() if the designated constraints +// aren't met. +type LogicalExpressionMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m LogicalExpressionMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m LogicalExpressionMultiError) AllErrors() []error { return m } + +// LogicalExpressionValidationError is the validation error returned by +// LogicalExpression.Validate if the designated constraints aren't met. +type LogicalExpressionValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e LogicalExpressionValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e LogicalExpressionValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e LogicalExpressionValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e LogicalExpressionValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e LogicalExpressionValidationError) ErrorName() string { + return "LogicalExpressionValidationError" +} + +// Error satisfies the builtin error interface +func (e LogicalExpressionValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sLogicalExpression.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = LogicalExpressionValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = LogicalExpressionValidationError{} + // Validate checks the field values on QueryOrder with the rules defined in the // proto definition for this message. If any rules are violated, the first // error encountered is returned, or nil if there are no violations. diff --git a/api/proto/banyandb/model/v1/query.proto b/api/proto/banyandb/model/v1/query.proto index e26e0c5..68c7033 100644 --- a/api/proto/banyandb/model/v1/query.proto +++ b/api/proto/banyandb/model/v1/query.proto @@ -72,8 +72,23 @@ message Condition { // tag_families are indexed. message Criteria { - string tag_family_name = 1; - repeated model.v1.Condition conditions = 2; + oneof exp { + LogicalExpression le = 1; + Condition condition = 2; + } +} + +// LogicalExpression supports logical operation +message LogicalExpression { + enum LogicalOp { + LOGICAL_OP_UNSPECIFIED = 0; + LOGICAL_OP_AND = 1; + LOGICAL_OP_OR = 2; + } + // op is a logial operation + LogicalOp op = 1; + Criteria left = 2; + Criteria right = 3; } enum Sort { diff --git a/api/proto/banyandb/stream/v1/query.pb.go b/api/proto/banyandb/stream/v1/query.pb.go index 5a06805..e4820f6 100644 --- a/api/proto/banyandb/stream/v1/query.pb.go +++ b/api/proto/banyandb/stream/v1/query.pb.go @@ -186,7 +186,7 @@ type QueryRequest struct { // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported OrderBy *v1.QueryOrder `protobuf:"bytes,5,opt,name=order_by,json=orderBy,proto3" json:"order_by,omitempty"` // tag_families are indexed. - Criteria []*v1.Criteria `protobuf:"bytes,6,rep,name=criteria,proto3" json:"criteria,omitempty"` + Criteria *v1.Criteria `protobuf:"bytes,6,opt,name=criteria,proto3" json:"criteria,omitempty"` // projection can be used to select the key names of the element in the response Projection *v1.TagProjection `protobuf:"bytes,7,opt,name=projection,proto3" json:"projection,omitempty"` } @@ -258,7 +258,7 @@ func (x *QueryRequest) GetOrderBy() *v1.QueryOrder { return nil } -func (x *QueryRequest) GetCriteria() []*v1.Criteria { +func (x *QueryRequest) GetCriteria() *v1.Criteria { if x != nil { return x.Criteria } @@ -317,7 +317,7 @@ var file_banyandb_stream_v1_query_proto_rawDesc = []byte{ 0x32, 0x1d, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x52, 0x07, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x42, 0x79, 0x12, 0x37, 0x0a, 0x08, 0x63, 0x72, 0x69, 0x74, - 0x65, 0x72, 0x69, 0x61, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, + 0x65, 0x72, 0x69, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x52, 0x08, 0x63, 0x72, 0x69, 0x74, 0x65, 0x72, 0x69, 0x61, 0x12, 0x4a, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, diff --git a/api/proto/banyandb/stream/v1/query.pb.validate.go b/api/proto/banyandb/stream/v1/query.pb.validate.go index d7cc760..e9464e6 100644 --- a/api/proto/banyandb/stream/v1/query.pb.validate.go +++ b/api/proto/banyandb/stream/v1/query.pb.validate.go @@ -456,38 +456,33 @@ func (m *QueryRequest) validate(all bool) error { } } - for idx, item := range m.GetCriteria() { - _, _ = idx, item - - if all { - switch v := interface{}(item).(type) { - case interface{ ValidateAll() error }: - if err := v.ValidateAll(); err != nil { - errors = append(errors, QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } - case interface{ Validate() error }: - if err := v.Validate(); err != nil { - errors = append(errors, QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), - reason: "embedded message failed validation", - cause: err, - }) - } + if all { + switch v := interface{}(m.GetCriteria()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, QueryRequestValidationError{ + field: "Criteria", + reason: "embedded message failed validation", + cause: err, + }) } - } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { + case interface{ Validate() error }: if err := v.Validate(); err != nil { - return QueryRequestValidationError{ - field: fmt.Sprintf("Criteria[%v]", idx), + errors = append(errors, QueryRequestValidationError{ + field: "Criteria", reason: "embedded message failed validation", cause: err, - } + }) + } + } + } else if v, ok := interface{}(m.GetCriteria()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return QueryRequestValidationError{ + field: "Criteria", + reason: "embedded message failed validation", + cause: err, } } - } if m.GetProjection() == nil { diff --git a/api/proto/banyandb/stream/v1/query.proto b/api/proto/banyandb/stream/v1/query.proto index 616d219..b4539fe 100644 --- a/api/proto/banyandb/stream/v1/query.proto +++ b/api/proto/banyandb/stream/v1/query.proto @@ -68,7 +68,7 @@ message QueryRequest { // order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported model.v1.QueryOrder order_by = 5; // tag_families are indexed. - repeated model.v1.Criteria criteria = 6; + model.v1.Criteria criteria = 6; // projection can be used to select the key names of the element in the response model.v1.TagProjection projection = 7 [(validate.rules).message.required = true]; } diff --git a/api/proto/openapi/banyandb/database/v1/rpc.swagger.json b/api/proto/openapi/banyandb/database/v1/rpc.swagger.json index f301bd6..a452822 100644 --- a/api/proto/openapi/banyandb/database/v1/rpc.swagger.json +++ b/api/proto/openapi/banyandb/database/v1/rpc.swagger.json @@ -1377,6 +1377,15 @@ "default": "ANALYZER_UNSPECIFIED", "description": " - ANALYZER_KEYWORD: Keyword analyzer is a “noop” analyzer which returns the entire input string as a single token.\n - ANALYZER_STANDARD: Standard analyzer provides grammar based tokenization\n - ANALYZER_SIMPLE: Simple analyzer breaks text into tokens at any non-letter character, \nsuch as numbers, spaces, hyphens and apostrophes, discards non-letter characters, \nand changes uppercase to lowercase." }, + "LogicalExpressionLogicalOp": { + "type": "string", + "enum": [ + "LOGICAL_OP_UNSPECIFIED", + "LOGICAL_OP_AND", + "LOGICAL_OP_OR" + ], + "default": "LOGICAL_OP_UNSPECIFIED" + }, "protobufAny": { "type": "object", "properties": { @@ -1447,14 +1456,11 @@ "v1Criteria": { "type": "object", "properties": { - "tagFamilyName": { - "type": "string" + "le": { + "$ref": "#/definitions/v1LogicalExpression" }, - "conditions": { - "type": "array", - "items": { - "$ref": "#/definitions/v1Condition" - } + "condition": { + "$ref": "#/definitions/v1Condition" } }, "description": "tag_families are indexed." @@ -1774,6 +1780,22 @@ } } }, + "v1LogicalExpression": { + "type": "object", + "properties": { + "op": { + "$ref": "#/definitions/LogicalExpressionLogicalOp", + "title": "op is a logial operation" + }, + "left": { + "$ref": "#/definitions/v1Criteria" + }, + "right": { + "$ref": "#/definitions/v1Criteria" + } + }, + "title": "LogicalExpression supports logical operation" + }, "v1Measure": { "type": "object", "properties": { diff --git a/api/proto/openapi/banyandb/measure/v1/rpc.swagger.json b/api/proto/openapi/banyandb/measure/v1/rpc.swagger.json index 843f477..a59bf95 100644 --- a/api/proto/openapi/banyandb/measure/v1/rpc.swagger.json +++ b/api/proto/openapi/banyandb/measure/v1/rpc.swagger.json @@ -81,6 +81,15 @@ } } }, + "LogicalExpressionLogicalOp": { + "type": "string", + "enum": [ + "LOGICAL_OP_UNSPECIFIED", + "LOGICAL_OP_AND", + "LOGICAL_OP_OR" + ], + "default": "LOGICAL_OP_UNSPECIFIED" + }, "QueryRequestAggregation": { "type": "object", "properties": { @@ -237,14 +246,11 @@ "v1Criteria": { "type": "object", "properties": { - "tagFamilyName": { - "type": "string" + "le": { + "$ref": "#/definitions/v1LogicalExpression" }, - "conditions": { - "type": "array", - "items": { - "$ref": "#/definitions/v1Condition" - } + "condition": { + "$ref": "#/definitions/v1Condition" } }, "description": "tag_families are indexed." @@ -346,6 +352,22 @@ } } }, + "v1LogicalExpression": { + "type": "object", + "properties": { + "op": { + "$ref": "#/definitions/LogicalExpressionLogicalOp", + "title": "op is a logial operation" + }, + "left": { + "$ref": "#/definitions/v1Criteria" + }, + "right": { + "$ref": "#/definitions/v1Criteria" + } + }, + "title": "LogicalExpression supports logical operation" + }, "v1Metadata": { "type": "object", "properties": { @@ -398,10 +420,7 @@ "description": "time_range is a range query with begin/end time of entities in the timeunit of milliseconds." }, "criteria": { - "type": "array", - "items": { - "$ref": "#/definitions/v1Criteria" - }, + "$ref": "#/definitions/v1Criteria", "description": "tag_families are indexed." }, "tagProjection": { diff --git a/api/proto/openapi/banyandb/stream/v1/rpc.swagger.json b/api/proto/openapi/banyandb/stream/v1/rpc.swagger.json index 71710d8..c07f760 100644 --- a/api/proto/openapi/banyandb/stream/v1/rpc.swagger.json +++ b/api/proto/openapi/banyandb/stream/v1/rpc.swagger.json @@ -70,6 +70,15 @@ "default": "BINARY_OP_UNSPECIFIED", "description": "BinaryOp specifies the operation imposed to the given query condition\nFor EQ, NE, LT, GT, LE and GE, only one operand should be given, i.e. one-to-one relationship.\nHAVING and NOT_HAVING allow multi-value to be the operand such as array/vector, i.e. one-to-many relationship.\nFor example, \"keyA\" contains \"valueA\" **and** \"valueB\"\nMATCH performances a full-text search if the tag is analyzed.\nThe string value applies to the same analyzer as the tag, but stri [...] }, + "LogicalExpressionLogicalOp": { + "type": "string", + "enum": [ + "LOGICAL_OP_UNSPECIFIED", + "LOGICAL_OP_AND", + "LOGICAL_OP_OR" + ], + "default": "LOGICAL_OP_UNSPECIFIED" + }, "modelv1Tag": { "type": "object", "properties": { @@ -149,14 +158,11 @@ "v1Criteria": { "type": "object", "properties": { - "tagFamilyName": { - "type": "string" + "le": { + "$ref": "#/definitions/v1LogicalExpression" }, - "conditions": { - "type": "array", - "items": { - "$ref": "#/definitions/v1Condition" - } + "condition": { + "$ref": "#/definitions/v1Condition" } }, "description": "tag_families are indexed." @@ -233,6 +239,22 @@ } } }, + "v1LogicalExpression": { + "type": "object", + "properties": { + "op": { + "$ref": "#/definitions/LogicalExpressionLogicalOp", + "title": "op is a logial operation" + }, + "left": { + "$ref": "#/definitions/v1Criteria" + }, + "right": { + "$ref": "#/definitions/v1Criteria" + } + }, + "title": "LogicalExpression supports logical operation" + }, "v1Metadata": { "type": "object", "properties": { @@ -299,10 +321,7 @@ "title": "order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported" }, "criteria": { - "type": "array", - "items": { - "$ref": "#/definitions/v1Criteria" - }, + "$ref": "#/definitions/v1Criteria", "description": "tag_families are indexed." }, "projection": { diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index dca7791..b3f08e1 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -40,7 +40,7 @@ type Item interface { } type SeekerBuilder interface { - Filter(indexRule *databasev1.IndexRule, condition Condition) SeekerBuilder + Filter(predicator index.Filter) SeekerBuilder OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder OrderByTime(order modelv1.Sort) SeekerBuilder Build() (Seeker, error) @@ -55,12 +55,7 @@ var _ SeekerBuilder = (*seekerBuilder)(nil) type seekerBuilder struct { seriesSpan *seriesSpan - conditions []struct { - indexRuleType databasev1.IndexRule_Type - indexRuleID uint32 - indexRuleAnalyzer databasev1.IndexRule_Analyzer - condition Condition - } + predicator index.Filter order modelv1.Sort indexRuleForSorting *databasev1.IndexRule rangeOptsForSorting index.RangeOpts @@ -70,11 +65,7 @@ func (s *seekerBuilder) Build() (Seeker, error) { if s.order == modelv1.Sort_SORT_UNSPECIFIED { s.order = modelv1.Sort_SORT_ASC } - conditions, err := s.buildConditions() - if err != nil { - return nil, err - } - se, err := s.buildSeries(conditions) + se, err := s.buildSeries() if err != nil { return nil, err } diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go index 2f14489..6890167 100644 --- a/banyand/tsdb/series_seek_filter.go +++ b/banyand/tsdb/series_seek_filter.go @@ -22,114 +22,28 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/posting" ) var ErrUnsupportedIndexRule = errors.New("the index rule is not supported") -type Condition map[string][]index.ConditionValue - -func (s *seekerBuilder) Filter(indexRule *databasev1.IndexRule, condition Condition) SeekerBuilder { - s.conditions = append(s.conditions, struct { - indexRuleType databasev1.IndexRule_Type - indexRuleID uint32 - indexRuleAnalyzer databasev1.IndexRule_Analyzer - condition Condition - }{ - indexRuleType: indexRule.GetType(), - indexRuleID: indexRule.GetMetadata().GetId(), - indexRuleAnalyzer: indexRule.Analyzer, - condition: condition, - }) +func (s *seekerBuilder) Filter(predicator index.Filter) SeekerBuilder { + s.predicator = predicator return s } -type condWithIRT struct { - indexRuleType databasev1.IndexRule_Type - condition index.Condition -} - -func (s *seekerBuilder) buildConditions() ([]condWithIRT, error) { - if len(s.conditions) < 1 { - return nil, nil - } - conditions := make([]condWithIRT, 0, len(s.conditions)) - for _, condition := range s.conditions { - if len(condition.condition) > 1 { - // TODO:// should support composite index rule - return nil, ErrUnsupportedIndexRule - } - cond := make(index.Condition) - term := index.FieldKey{ - SeriesID: s.seriesSpan.seriesID, - IndexRuleID: condition.indexRuleID, - Analyzer: condition.indexRuleAnalyzer, - } - for _, c := range condition.condition { - cond[term] = c - break - } - conditions = append(conditions, condWithIRT{indexRuleType: condition.indexRuleType, condition: cond}) - } - return conditions, nil -} - -func (s *seekerBuilder) buildIndexFilter(block blockDelegate, conditions []condWithIRT) (filterFn, error) { - var allItemIDs posting.List - addIDs := func(allList posting.List, searcher index.Searcher, cond index.Condition) (posting.List, bool, error) { - tree, err := index.BuildTree(searcher, cond) - if err != nil { - return nil, false, err - } - rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{ - SeriesID: s.seriesSpan.seriesID, - IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(), - }) - if found { - s.rangeOptsForSorting = rangeOpts - } - list, err := tree.Execute() - if errors.Is(err, index.ErrEmptyTree) { - return allList, false, nil - } - if err != nil { - return nil, false, err - } - if allList == nil { - allList = list - } else { - err = allList.Intersect(list) - if err != nil { - return nil, false, err - } - } - return allList, true, nil - } - allInvalid := true - for i, condition := range conditions { - var valid bool - var err error - switch condition.indexRuleType { +func (s *seekerBuilder) buildIndexFilter(block blockDelegate) (filterFn, error) { + allItemIDs, err := s.predicator.Execute(func(typ databasev1.IndexRule_Type) (index.Searcher, error) { + switch typ { case databasev1.IndexRule_TYPE_INVERTED: - allItemIDs, valid, err = addIDs(allItemIDs, block.invertedIndexReader(), condition.condition) + return block.invertedIndexReader(), nil case databasev1.IndexRule_TYPE_TREE: - allItemIDs, valid, err = addIDs(allItemIDs, block.lsmIndexReader(), condition.condition) + return block.lsmIndexReader(), nil default: return nil, ErrUnsupportedIndexRule } - if err != nil { - return nil, err - } - if i > 0 && allItemIDs.IsEmpty() { - return func(_ Item) bool { - return false - }, nil - } - allInvalid = allInvalid && !valid - } - - if allInvalid { - return nil, nil + }, s.seriesSpan.seriesID) + if err != nil { + return nil, err } return func(item Item) bool { valid := allItemIDs.Contains(item.ID()) diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index 481134e..76bc194 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -47,14 +47,14 @@ func (s *seekerBuilder) OrderByTime(order modelv1.Sort) SeekerBuilder { return s } -func (s *seekerBuilder) buildSeries(conditions []condWithIRT) ([]Iterator, error) { +func (s *seekerBuilder) buildSeries() ([]Iterator, error) { if s.indexRuleForSorting == nil { - return s.buildSeriesByTime(conditions) + return s.buildSeriesByTime() } - return s.buildSeriesByIndex(conditions) + return s.buildSeriesByIndex() } -func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []Iterator, err error) { +func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { timeFilter := func(item Item) bool { valid := s.seriesSpan.timeRange.Contains(item.Time()) timeRange := s.seriesSpan.timeRange @@ -71,7 +71,7 @@ func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []I IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(), } filters := []filterFn{timeFilter} - filter, err := s.buildIndexFilter(b, conditions) + filter, err := s.buildIndexFilter(b) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []I return } -func (s *seekerBuilder) buildSeriesByTime(conditions []condWithIRT) ([]Iterator, error) { +func (s *seekerBuilder) buildSeriesByTime() ([]Iterator, error) { bb := s.seriesSpan.blocks switch s.order { case modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_UNSPECIFIED: @@ -129,7 +129,7 @@ func (s *seekerBuilder) buildSeriesByTime(conditions []condWithIRT) ([]Iterator, return nil, err } if inner != nil { - filter, err := s.buildIndexFilter(b, conditions) + filter, err := s.buildIndexFilter(b) if err != nil { return nil, err } diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 9ad7cbe..ee4d284 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -59,6 +59,20 @@ func (e Entity) Prepend(entry Entry) Entity { return d } +func (e Entity) Copy() Entity { + a := make(Entity, len(e)) + copy(a, e) + return a +} + +func NewEntity(len int) Entity { + e := make(Entity, len) + for i := 0; i < len; i++ { + e[i] = AnyEntry + } + return e +} + type Path struct { prefix []byte seekKey []byte diff --git a/docs/api-reference.md b/docs/api-reference.md index d871d63..d00648c 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -36,6 +36,7 @@ - [banyandb/model/v1/query.proto](#banyandb_model_v1_query-proto) - [Condition](#banyandb-model-v1-Condition) - [Criteria](#banyandb-model-v1-Criteria) + - [LogicalExpression](#banyandb-model-v1-LogicalExpression) - [QueryOrder](#banyandb-model-v1-QueryOrder) - [Tag](#banyandb-model-v1-Tag) - [TagFamily](#banyandb-model-v1-TagFamily) @@ -44,6 +45,7 @@ - [TimeRange](#banyandb-model-v1-TimeRange) - [Condition.BinaryOp](#banyandb-model-v1-Condition-BinaryOp) + - [LogicalExpression.LogicalOp](#banyandb-model-v1-LogicalExpression-LogicalOp) - [Sort](#banyandb-model-v1-Sort) - [banyandb/database/v1/schema.proto](#banyandb_database_v1_schema-proto) @@ -609,8 +611,25 @@ tag_families are indexed. | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| tag_family_name | [string](#string) | | | -| conditions | [Condition](#banyandb-model-v1-Condition) | repeated | | +| le | [LogicalExpression](#banyandb-model-v1-LogicalExpression) | | | +| condition | [Condition](#banyandb-model-v1-Condition) | | | + + + + + + +<a name="banyandb-model-v1-LogicalExpression"></a> + +### LogicalExpression +LogicalExpression supports logical operation + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| op | [LogicalExpression.LogicalOp](#banyandb-model-v1-LogicalExpression-LogicalOp) | | op is a logial operation | +| left | [Criteria](#banyandb-model-v1-Criteria) | | | +| right | [Criteria](#banyandb-model-v1-Criteria) | | | @@ -747,6 +766,19 @@ Each item in a string array is seen as a token instead of a query expression. +<a name="banyandb-model-v1-LogicalExpression-LogicalOp"></a> + +### LogicalExpression.LogicalOp + + +| Name | Number | Description | +| ---- | ------ | ----------- | +| LOGICAL_OP_UNSPECIFIED | 0 | | +| LOGICAL_OP_AND | 1 | | +| LOGICAL_OP_OR | 2 | | + + + <a name="banyandb-model-v1-Sort"></a> ### Sort @@ -2047,7 +2079,7 @@ QueryRequest is the request contract for query. | ----- | ---- | ----- | ----------- | | metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | metadata is required | | time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) | | time_range is a range query with begin/end time of entities in the timeunit of milliseconds. | -| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | repeated | tag_families are indexed. | +| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | | tag_families are indexed. | | tag_projection | [banyandb.model.v1.TagProjection](#banyandb-model-v1-TagProjection) | | tag_projection can be used to select tags of the data points in the response | | field_projection | [QueryRequest.FieldProjection](#banyandb-measure-v1-QueryRequest-FieldProjection) | | field_projection can be used to select fields of the data points in the response | | group_by | [QueryRequest.GroupBy](#banyandb-measure-v1-QueryRequest-GroupBy) | | group_by groups data points based on their field value for a specific tag and use field_name as the projection name | @@ -2595,7 +2627,7 @@ QueryRequest is the request contract for query. | offset | [uint32](#uint32) | | offset is used to support pagination, together with the following limit | | limit | [uint32](#uint32) | | limit is used to impose a boundary on the number of records being returned | | order_by | [banyandb.model.v1.QueryOrder](#banyandb-model-v1-QueryOrder) | | order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported | -| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | repeated | tag_families are indexed. | +| criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | | tag_families are indexed. | | projection | [banyandb.model.v1.TagProjection](#banyandb-model-v1-TagProjection) | | projection can be used to select the key names of the element in the response | diff --git a/pkg/index/index.go b/pkg/index/index.go index ad53ba5..6f490cd 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -19,6 +19,7 @@ package index import ( "bytes" + "fmt" "io" "github.com/pkg/errors" @@ -36,7 +37,6 @@ var ErrMalformed = errors.New("the data is malformed") type FieldKey struct { SeriesID common.SeriesID IndexRuleID uint32 - EncodeTerm bool Analyzer databasev1.IndexRule_Analyzer } @@ -169,3 +169,10 @@ type Store interface { // Flush flushed memory data to disk Flush() error } + +type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error) + +type Filter interface { + fmt.Stringer + Execute(getSearcher GetSearcher, seriesID common.SeriesID) (posting.List, error) +} diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go index cfa1dcb..4d889cd 100644 --- a/pkg/index/inverted/inverted_test.go +++ b/pkg/index/inverted/inverted_test.go @@ -37,7 +37,6 @@ import ( var serviceName = index.FieldKey{ // http_method IndexRuleID: 6, - EncodeTerm: false, Analyzer: databasev1.IndexRule_ANALYZER_SIMPLE, } diff --git a/pkg/index/testcases/service_name.go b/pkg/index/testcases/service_name.go index 5c711f7..60d7e18 100644 --- a/pkg/index/testcases/service_name.go +++ b/pkg/index/testcases/service_name.go @@ -31,7 +31,6 @@ import ( var serviceName = index.FieldKey{ // http_method IndexRuleID: 6, - EncodeTerm: false, } func RunServiceName(t *testing.T, store SimpleStore) { diff --git a/pkg/index/tree.go b/pkg/index/tree.go deleted file mode 100644 index 7718643..0000000 --- a/pkg/index/tree.go +++ /dev/null @@ -1,367 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package index - -import ( - "bytes" - "encoding/base64" - "encoding/json" - "strings" - - "github.com/pkg/errors" - - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/index/posting" -) - -var ( - ErrNotRangeOperation = errors.New("this is not an range operation") - ErrEmptyTree = errors.New("tree is empty") -) - -type Executor interface { - Execute() (posting.List, error) -} - -type Tree interface { - Executor - TrimRangeLeaf(key FieldKey) (rangeOpts RangeOpts, found bool) -} - -type Condition map[FieldKey][]ConditionValue - -type ConditionValue struct { - Values [][]byte - Op modelv1.Condition_BinaryOp -} - -func BuildTree(searcher Searcher, condMap Condition) (Tree, error) { - root := &andNode{ - node: &node{ - SubNodes: make([]Executor, 0), - searcher: searcher, - }, - } - for key, conds := range condMap { - var rangeLeaf *rangeOp - for _, cond := range conds { - if rangeLeaf != nil && !rangeOP(cond.Op) { - return nil, errors.Wrapf(ErrNotRangeOperation, "op:%s", cond.Op.String()) - } - if rangeOP(cond.Op) { - if rangeLeaf == nil { - rangeLeaf = root.addRangeLeaf(key) - } - opts := rangeLeaf.Opts - switch cond.Op { - case modelv1.Condition_BINARY_OP_GT: - opts.Lower = bytes.Join(cond.Values, nil) - case modelv1.Condition_BINARY_OP_GE: - opts.Lower = bytes.Join(cond.Values, nil) - opts.IncludesLower = true - case modelv1.Condition_BINARY_OP_LT: - opts.Upper = bytes.Join(cond.Values, nil) - case modelv1.Condition_BINARY_OP_LE: - opts.Upper = bytes.Join(cond.Values, nil) - opts.IncludesUpper = true - } - continue - } - switch cond.Op { - case modelv1.Condition_BINARY_OP_EQ: - root.addEq(key, cond.Values) - case modelv1.Condition_BINARY_OP_MATCH: - root.addMatch(key, cond.Values) - case modelv1.Condition_BINARY_OP_NE: - root.addNot(key, root.newEq(key, cond.Values)) - case modelv1.Condition_BINARY_OP_HAVING: - n := root.addAndNode(len(cond.Values)) - for _, v := range cond.Values { - n.addEq(key, [][]byte{v}) - } - case modelv1.Condition_BINARY_OP_NOT_HAVING: - n := root.newAndNode(len(cond.Values)) - for _, v := range cond.Values { - n.addEq(key, [][]byte{v}) - } - root.addNot(key, n) - } - } - } - return root, nil -} - -func rangeOP(op modelv1.Condition_BinaryOp) bool { - switch op { - case modelv1.Condition_BINARY_OP_GT, - modelv1.Condition_BINARY_OP_GE, - modelv1.Condition_BINARY_OP_LT, - modelv1.Condition_BINARY_OP_LE: - return true - } - return false -} - -type logicalOP interface { - Executor - merge(posting.List) error -} - -type node struct { - searcher Searcher - value posting.List - SubNodes []Executor `json:"sub_nodes,omitempty"` -} - -func (n *node) newEq(key FieldKey, values [][]byte) *eq { - return &eq{ - leaf: &leaf{ - Key: key, - Values: values, - searcher: n.searcher, - }, - } -} - -func (n *node) newMatch(key FieldKey, values [][]byte) *match { - return &match{ - leaf: &leaf{ - Key: key, - Values: values, - searcher: n.searcher, - }, - } -} - -func (n *node) addMatch(key FieldKey, values [][]byte) { - n.SubNodes = append(n.SubNodes, n.newMatch(key, values)) -} - -func (n *node) addEq(key FieldKey, values [][]byte) { - n.SubNodes = append(n.SubNodes, n.newEq(key, values)) -} - -func (n *node) addNot(key FieldKey, inner Executor) { - n.SubNodes = append(n.SubNodes, ¬{ - Key: key, - searcher: n.searcher, - Inner: inner, - }) -} - -func (n *node) addRangeLeaf(key FieldKey) *rangeOp { - r := &rangeOp{ - leaf: &leaf{ - Key: key, - searcher: n.searcher, - }, - Opts: &RangeOpts{}, - } - n.SubNodes = append(n.SubNodes, r) - return r -} - -func (n *node) newAndNode(size int) *andNode { - return &andNode{ - node: &node{ - searcher: n.searcher, - SubNodes: make([]Executor, 0, size), - }, - } -} - -func (n *node) addAndNode(size int) *andNode { - on := n.newAndNode(size) - n.SubNodes = append(n.SubNodes, on) - return on -} - -func (n *node) pop() (Executor, bool) { - if len(n.SubNodes) < 1 { - return nil, false - } - sn := n.SubNodes[0] - n.SubNodes = n.SubNodes[1:] - return sn, true -} - -func execute(n *node, lp logicalOP) (posting.List, error) { - ex, hasNext := n.pop() - if !hasNext { - if n.value == nil { - return nil, ErrEmptyTree - } - return n.value, nil - } - r, err := ex.Execute() - if err != nil { - return nil, err - } - if n.value == nil { - n.value = r - return lp.Execute() - } - err = lp.merge(r) - if err != nil { - return nil, err - } - if n.value.IsEmpty() { - return n.value, nil - } - return lp.Execute() -} - -type andNode struct { - *node -} - -func (an *andNode) TrimRangeLeaf(key FieldKey) (RangeOpts, bool) { - removeLeaf := func(s []Executor, index int) []Executor { - return append(s[:index], s[index+1:]...) - } - for i, subNode := range an.SubNodes { - leafRange, ok := subNode.(*rangeOp) - if !ok { - continue - } - if key.Equal(leafRange.Key) { - an.SubNodes = removeLeaf(an.SubNodes, i) - return *leafRange.Opts, true - } - } - return RangeOpts{}, false -} - -func (an *andNode) merge(list posting.List) error { - return an.value.Intersect(list) -} - -func (an *andNode) Execute() (posting.List, error) { - return execute(an.node, an) -} - -func (an *andNode) MarshalJSON() ([]byte, error) { - data := make(map[string]interface{}, 1) - data["and"] = an.node.SubNodes - return json.Marshal(data) -} - -type leaf struct { - Executor - Key FieldKey - Values [][]byte - searcher Searcher -} - -type not struct { - Executor - Key FieldKey - searcher Searcher - Inner Executor -} - -func (n *not) Execute() (posting.List, error) { - all, err := n.searcher.MatchField(n.Key) - if err != nil { - return nil, err - } - list, err := n.Inner.Execute() - if err != nil { - return nil, err - } - err = all.Difference(list) - return all, err -} - -func (n *not) MarshalJSON() ([]byte, error) { - data := make(map[string]interface{}, 1) - data["not"] = n.Inner - return json.Marshal(data) -} - -type eq struct { - *leaf -} - -func (eq *eq) Execute() (posting.List, error) { - return eq.searcher.MatchTerms(Field{ - Key: eq.Key, - Term: bytes.Join(eq.Values, nil), - }) -} - -func (eq *eq) MarshalJSON() ([]byte, error) { - data := make(map[string]interface{}, 1) - data["eq"] = eq.leaf - return json.Marshal(data) -} - -type match struct { - *leaf -} - -func (match *match) Execute() (posting.List, error) { - matches := make([]string, len(match.Values)) - for i, v := range match.Values { - matches[i] = string(v) - } - return match.searcher.Match( - match.Key, - matches, - ) -} - -func (match *match) MarshalJSON() ([]byte, error) { - data := make(map[string]interface{}, 1) - data["match"] = match.leaf - return json.Marshal(data) -} - -type rangeOp struct { - *leaf - Opts *RangeOpts -} - -func (r *rangeOp) Execute() (posting.List, error) { - return r.searcher.Range(r.Key, *r.Opts) -} - -func (r *rangeOp) MarshalJSON() ([]byte, error) { - data := make(map[string]interface{}, 1) - var builder strings.Builder - if r.Opts.Lower != nil { - if r.Opts.IncludesLower { - builder.WriteString("[") - } else { - builder.WriteString("(") - } - } - builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower)) - builder.WriteString(",") - builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper)) - if r.Opts.Upper != nil { - if r.Opts.IncludesUpper { - builder.WriteString("]") - } else { - builder.WriteString(")") - } - } - data["key"] = r.Key - data["range"] = builder.String() - return json.Marshal(data) -} diff --git a/pkg/query/logical/expr_literal.go b/pkg/query/logical/expr_literal.go index 4994840..c827e95 100644 --- a/pkg/query/logical/expr_literal.go +++ b/pkg/query/logical/expr_literal.go @@ -18,13 +18,15 @@ package logical import ( + "bytes" + "encoding/hex" "fmt" "strconv" + "strings" "golang.org/x/exp/slices" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" ) @@ -37,24 +39,21 @@ type int64Literal struct { int64 } -func (i *int64Literal) Compare(tagValue *modelv1.TagValue) (int, bool) { - intValue := tagValue.GetInt() - if intValue == nil { - return 0, false +func (i *int64Literal) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*int64Literal); ok { + return int(i.int64 - o.int64), true } - return int(i.int64 - intValue.Value), true + return 0, false } -func (i *int64Literal) BelongTo(tagValue *modelv1.TagValue) bool { - intValue := tagValue.GetInt() - if intValue != nil { - return i.int64 == intValue.Value +func (i *int64Literal) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*int64Literal); ok { + return i == o } - intArray := tagValue.GetIntArray() - if intArray == nil { - return false + if o, ok := other.(*int64ArrLiteral); ok { + return slices.Contains(o.arr, i.int64) } - return slices.Contains(intArray.Value, i.int64) + return false } func (i *int64Literal) Bytes() [][]byte { @@ -90,32 +89,26 @@ type int64ArrLiteral struct { arr []int64 } -func (i *int64ArrLiteral) Compare(tagValue *modelv1.TagValue) (int, bool) { - intArray := tagValue.GetIntArray() - if intArray == nil { - return 0, false - } - if slices.Equal(i.arr, intArray.Value) { - return 0, true +func (i *int64ArrLiteral) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*int64ArrLiteral); ok { + return 0, slices.Equal(i.arr, o.arr) } return 0, false } -func (i *int64ArrLiteral) BelongTo(tagValue *modelv1.TagValue) bool { - intValue := tagValue.GetInt() - if intValue != nil { - return slices.Contains(i.arr, intValue.Value) +func (i *int64ArrLiteral) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*int64Literal); ok { + return slices.Contains(i.arr, o.int64) } - intArray := tagValue.GetIntArray() - if intArray == nil { - return false - } - for _, v := range intArray.Value { - if !slices.Contains(i.arr, v) { - return false + if o, ok := other.(*int64ArrLiteral); ok { + for _, v := range o.arr { + if !slices.Contains(i.arr, v) { + return false + } } + return true } - return true + return false } func (i *int64ArrLiteral) Bytes() [][]byte { @@ -157,27 +150,21 @@ type strLiteral struct { string } -func (s *strLiteral) Compare(tagValue *modelv1.TagValue) (int, bool) { - strValue := tagValue.GetStr() - if strValue == nil { - return 0, false - } - if strValue.Value == s.string { - return 0, true +func (s *strLiteral) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*strLiteral); ok { + return strings.Compare(s.string, o.string), true } return 0, false } -func (s *strLiteral) BelongTo(tagValue *modelv1.TagValue) bool { - strValue := tagValue.GetStr() - if strValue != nil { - return s.string == strValue.Value +func (s *strLiteral) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*strLiteral); ok { + return s == o } - strArray := tagValue.GetStrArray() - if strArray == nil { - return false + if o, ok := other.(*strArrLiteral); ok { + return slices.Contains(o.arr, s.string) } - return slices.Contains(strArray.Value, s.string) + return false } func (s *strLiteral) Bytes() [][]byte { @@ -192,7 +179,7 @@ func (s *strLiteral) Equal(expr Expr) bool { return false } -func Str(str string) Expr { +func Str(str string) LiteralExpr { return &strLiteral{str} } @@ -213,32 +200,26 @@ type strArrLiteral struct { arr []string } -func (s *strArrLiteral) Compare(tagValue *modelv1.TagValue) (int, bool) { - strArray := tagValue.GetStrArray() - if strArray == nil { - return 0, false - } - if stringSlicesEqual(s.arr, strArray.Value) { - return 0, true +func (s *strArrLiteral) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*strArrLiteral); ok { + return 0, stringSlicesEqual(s.arr, o.arr) } return 0, false } -func (s *strArrLiteral) BelongTo(tagValue *modelv1.TagValue) bool { - strValue := tagValue.GetStr() - if strValue != nil { - return slices.Contains(s.arr, strValue.Value) +func (s *strArrLiteral) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*strLiteral); ok { + return slices.Contains(s.arr, o.string) } - strArray := tagValue.GetStrArray() - if strArray == nil { - return false - } - for _, v := range strArray.Value { - if !slices.Contains(s.arr, v) { - return false + if o, ok := other.(*strArrLiteral); ok { + for _, v := range o.arr { + if !slices.Contains(s.arr, v) { + return false + } } + return true } - return true + return false } func (s *strArrLiteral) Bytes() [][]byte { @@ -281,6 +262,26 @@ func (s *idLiteral) Bytes() [][]byte { return [][]byte{[]byte(s.string)} } +func (s *idLiteral) Compare(other LiteralExpr) (int, bool) { + if o, ok := other.(*idLiteral); ok { + return strings.Compare(s.string, o.string), true + } + return 0, false +} + +func (s *idLiteral) BelongTo(other LiteralExpr) bool { + if o, ok := other.(*idLiteral); ok { + return s == o + } + if o, ok := other.(*strLiteral); ok { + return s.string == o.string + } + if o, ok := other.(*strArrLiteral); ok { + return slices.Contains(o.arr, s.string) + } + return false +} + func (s *idLiteral) Equal(expr Expr) bool { if other, ok := expr.(*idLiteral); ok { return other.string == s.string @@ -289,7 +290,7 @@ func (s *idLiteral) Equal(expr Expr) bool { return false } -func ID(id string) Expr { +func ID(id string) LiteralExpr { return &idLiteral{id} } @@ -300,3 +301,33 @@ func (s *idLiteral) DataType() int32 { func (s *idLiteral) String() string { return s.string } + +var _ LiteralExpr = (*idLiteral)(nil) + +type bytesLiteral struct { + bb []byte +} + +func newBytesLiteral(bb []byte) *bytesLiteral { + return &bytesLiteral{bb: bb} +} + +func (b *bytesLiteral) Bytes() [][]byte { + return [][]byte{b.bb} +} + +func (b *bytesLiteral) Equal(expr Expr) bool { + if other, ok := expr.(*bytesLiteral); ok { + return bytes.Equal(other.bb, b.bb) + } + + return false +} + +func (b *bytesLiteral) DataType() int32 { + return int32(databasev1.TagType_TAG_TYPE_DATA_BINARY) +} + +func (b *bytesLiteral) String() string { + return hex.EncodeToString(b.bb) +} diff --git a/pkg/query/logical/index_filter.go b/pkg/query/logical/index_filter.go new file mode 100644 index 0000000..6a4f019 --- /dev/null +++ b/pkg/query/logical/index_filter.go @@ -0,0 +1,603 @@ +package logical + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "strings" + + "github.com/apache/skywalking-banyandb/api/common" + database_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/posting" + "github.com/pkg/errors" +) + +var ( + ErrNotRangeOperation = errors.New("this is not an range operation") + ErrEmptyTree = errors.New("tree is empty") +) + +type globalIndexError struct { + indexRule *database_v1.IndexRule + expr LiteralExpr +} + +func (g *globalIndexError) Error() string { return g.indexRule.String() } + +func buildLocalFilter(criteria *model_v1.Criteria, schema Schema, entityDict map[string]int, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { + switch criteria.GetExp().(type) { + case *model_v1.Criteria_Condition: + cond := criteria.GetCondition() + expr, entity, err := parseExprOrEntity(entityDict, entity, cond) + if err != nil { + return nil, nil, err + } + if entity != nil { + return nil, []tsdb.Entity{entity}, nil + } + if ok, indexRule := schema.IndexDefined(cond.Name); ok { + if indexRule.Location == database_v1.IndexRule_LOCATION_GLOBAL { + return nil, nil, &globalIndexError{ + indexRule: indexRule, + expr: expr, + } + } + return parseCondition(cond, indexRule, expr, entity) + } else { + return eNode, []tsdb.Entity{entity}, nil + } + case *model_v1.Criteria_Le: + le := criteria.GetLe() + left, leftEntities, err := buildLocalFilter(le.Left, schema, entityDict, entity) + if err != nil { + return nil, nil, err + } + right, rightEntities, err := buildLocalFilter(le.Left, schema, entityDict, entity) + if err != nil { + return nil, nil, err + } + entities := parseEntities(le.Op, entity, leftEntities, rightEntities) + if entities == nil { + return nil, nil, nil + } + switch le.Op { + case model_v1.LogicalExpression_LOGICAL_OP_AND: + and := newAnd(2) + and.append(left).append(right) + return and, entities, nil + case model_v1.LogicalExpression_LOGICAL_OP_OR: + or := newOr(2) + or.append(left).append(right) + return or, entities, nil + } + + } + return nil, nil, ErrInvalidConditionType +} + +func parseCondition(cond *model_v1.Condition, indexRule *database_v1.IndexRule, expr LiteralExpr, entity tsdb.Entity) (index.Filter, []tsdb.Entity, error) { + switch cond.Op { + case model_v1.Condition_BINARY_OP_GT: + return newRange(indexRule, index.RangeOpts{ + Lower: bytes.Join(expr.Bytes(), nil), + }), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_GE: + return newRange(indexRule, index.RangeOpts{ + IncludesLower: true, + Lower: bytes.Join(expr.Bytes(), nil), + }), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_LT: + return newRange(indexRule, index.RangeOpts{ + Upper: bytes.Join(expr.Bytes(), nil), + }), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_LE: + return newRange(indexRule, index.RangeOpts{ + IncludesUpper: true, + Upper: bytes.Join(expr.Bytes(), nil), + }), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_EQ: + return newEq(indexRule, expr), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_MATCH: + return newMatch(indexRule, expr), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_NE: + return newNot(indexRule, newEq(indexRule, expr)), []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_HAVING: + bb := expr.Bytes() + and := newAnd(len(bb)) + for _, b := range bb { + and.append(newEq(indexRule, newBytesLiteral(b))) + } + return and, []tsdb.Entity{entity}, nil + case model_v1.Condition_BINARY_OP_NOT_HAVING: + bb := expr.Bytes() + and := newAnd(len(bb)) + for _, b := range bb { + and.append(newEq(indexRule, newBytesLiteral(b))) + } + return newNot(indexRule, and), []tsdb.Entity{entity}, nil + } + return nil, nil, ErrInvalidConditionType +} + +func parseExprOrEntity(entityDict map[string]int, entity tsdb.Entity, cond *model_v1.Condition) (LiteralExpr, tsdb.Entity, error) { + entityIdx, ok := entityDict[cond.Name] + if ok && cond.Op != model_v1.Condition_BINARY_OP_EQ { + return nil, nil, errors.WithMessagef(ErrInvalidConditionType, "tag belongs to the entity only supports EQ operation in condition(%v)", cond) + } + switch v := cond.Value.Value.(type) { + case *model_v1.TagValue_Str: + if ok { + entity[entityIdx] = []byte(v.Str.GetValue()) + return nil, entity, nil + } + return Str(v.Str.GetValue()), nil, nil + case *model_v1.TagValue_Id: + if ok { + entity[entityIdx] = []byte(v.Id.GetValue()) + return nil, entity, nil + } + return ID(v.Id.GetValue()), nil, nil + + case *model_v1.TagValue_StrArray: + return &strArrLiteral{ + arr: v.StrArray.GetValue(), + }, nil, nil + case *model_v1.TagValue_Int: + if ok { + entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue()) + return nil, entity, nil + } + return &int64Literal{ + int64: v.Int.GetValue(), + }, nil, nil + case *model_v1.TagValue_IntArray: + return &int64ArrLiteral{ + arr: v.IntArray.GetValue(), + }, nil, nil + } + return nil, nil, ErrInvalidConditionType +} + +func parseEntities(op model_v1.LogicalExpression_LogicalOp, input tsdb.Entity, left, right []tsdb.Entity) []tsdb.Entity { + count := len(input) + result := make(tsdb.Entity, count) + mergedEntities := make([]tsdb.Entity, 0, len(left)+len(right)) + mergedEntities = append(mergedEntities, left...) + mergedEntities = append(mergedEntities, right...) + switch op { + case model_v1.LogicalExpression_LOGICAL_OP_AND: + for i := 0; i < count; i++ { + entry := tsdb.AnyEntry + for j := 0; j < len(mergedEntities); j++ { + e := mergedEntities[j][i] + if bytes.Equal(e, tsdb.AnyEntry) { + entry = e + } else if !bytes.Equal(entry, e) { + return nil + } + } + result[i] = entry + } + case model_v1.LogicalExpression_LOGICAL_OP_OR: + return mergedEntities + } + return nil +} + +type FieldKey struct { + *database_v1.IndexRule +} + +func newFieldKey(indexRule *database_v1.IndexRule) FieldKey { + return FieldKey{indexRule} +} + +func (fk FieldKey) ToIndex(seriesID common.SeriesID) index.FieldKey { + return index.FieldKey{ + IndexRuleID: fk.Metadata.Id, + Analyzer: fk.Analyzer, + SeriesID: seriesID, + } +} + +type logicalOP interface { + index.Filter + merge(posting.List) error +} + +type node struct { + value posting.List + SubNodes []index.Filter `json:"sub_nodes,omitempty"` +} + +func (n *node) append(sub index.Filter) *node { + n.SubNodes = append(n.SubNodes, sub) + return n +} + +func (n *node) pop() (index.Filter, bool) { + if len(n.SubNodes) < 1 { + return nil, false + } + sn := n.SubNodes[0] + n.SubNodes = n.SubNodes[1:] + return sn, true +} + +func execute(searcher index.GetSearcher, seriesID common.SeriesID, n *node, lp logicalOP) (posting.List, error) { + ex, hasNext := n.pop() + if !hasNext { + if n.value == nil { + return nil, ErrEmptyTree + } + return n.value, nil + } + r, err := ex.Execute(searcher, seriesID) + if err != nil { + return nil, err + } + err = lp.merge(r) + if err != nil { + return nil, err + } + return lp.Execute(searcher, seriesID) +} + +type andNode struct { + *node +} + +func newAnd(size int) *andNode { + return &andNode{ + node: &node{ + SubNodes: make([]index.Filter, 0, size), + }, + } +} + +func (an *andNode) merge(list posting.List) error { + if _, ok := list.(bypassList); ok { + return nil + } + if an.value == nil { + an.value = list + return nil + } + return an.value.Intersect(list) +} + +func (an *andNode) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + return execute(searcher, seriesID, an.node, an) +} + +func (an *andNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["and"] = an.node.SubNodes + return json.Marshal(data) +} + +func (an *andNode) String() string { + return jsonToString(an) +} + +type orNode struct { + *node +} + +func newOr(size int) *orNode { + return &orNode{ + node: &node{ + SubNodes: make([]index.Filter, 0, size), + }, + } +} + +func (on *orNode) merge(list posting.List) error { + // If a predicator is not indexed, all predicator are ignored. + // The tagFilter will take up this job to filter this items. + if _, ok := on.value.(bypassList); ok { + return nil + } + if _, ok := list.(bypassList); ok { + on.value = list + return nil + } + if on.value == nil { + on.value = list + } + return on.value.Union(list) +} + +func (on *orNode) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + return execute(searcher, seriesID, on.node, on) +} + +func (on *orNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["or"] = on.node.SubNodes + return json.Marshal(data) +} + +func (on *orNode) String() string { + return jsonToString(on) +} + +type leaf struct { + index.Filter + Key FieldKey + Expr LiteralExpr +} + +type not struct { + index.Filter + Key FieldKey + Inner index.Filter +} + +func newNot(indexRule *database_v1.IndexRule, inner index.Filter) *not { + return ¬{ + Key: newFieldKey(indexRule), + Inner: inner, + } +} + +func (n *not) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + s, err := searcher(n.Key.Type) + if err != nil { + return nil, err + } + all, err := s.MatchField(n.Key.ToIndex(seriesID)) + if err != nil { + return nil, err + } + list, err := n.Inner.Execute(searcher, seriesID) + if err != nil { + return nil, err + } + err = all.Difference(list) + return all, err +} + +func (n *not) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["not"] = n.Inner + return json.Marshal(data) +} + +func (n *not) String() string { + return jsonToString(n) +} + +type eq struct { + *leaf +} + +func newEq(indexRule *database_v1.IndexRule, values LiteralExpr) *eq { + return &eq{ + leaf: &leaf{ + Key: newFieldKey(indexRule), + Expr: values, + }, + } +} + +func (eq *eq) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + s, err := searcher(eq.Key.Type) + if err != nil { + return nil, err + } + return s.MatchTerms(index.Field{ + Key: eq.Key.ToIndex(seriesID), + Term: bytes.Join(eq.Expr.Bytes(), nil), + }) +} + +func (eq *eq) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["eq"] = eq.leaf + return json.Marshal(data) +} + +func (eq *eq) String() string { + return jsonToString(eq) +} + +type match struct { + *leaf +} + +func newMatch(indexRule *database_v1.IndexRule, values LiteralExpr) *match { + return &match{ + leaf: &leaf{ + Key: newFieldKey(indexRule), + Expr: values, + }, + } +} + +func (match *match) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + s, err := searcher(match.Key.Type) + if err != nil { + return nil, err + } + bb := match.Expr.Bytes() + matches := make([]string, len(bb)) + for i, v := range bb { + matches[i] = string(v) + } + return s.Match( + match.Key.ToIndex(seriesID), + matches, + ) +} + +func (match *match) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["match"] = match.leaf + return json.Marshal(data) +} + +func (match *match) String() string { + return jsonToString(match) +} + +type rangeOp struct { + *leaf + Opts index.RangeOpts +} + +func newRange(indexRule *database_v1.IndexRule, opts index.RangeOpts) *rangeOp { + return &rangeOp{ + leaf: &leaf{ + Key: newFieldKey(indexRule), + }, + Opts: opts, + } +} + +func (r *rangeOp) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + s, err := searcher(r.Key.Type) + if err != nil { + return nil, err + } + return s.Range(r.Key.ToIndex(seriesID), r.Opts) +} + +func (r *rangeOp) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + var builder strings.Builder + if r.Opts.Lower != nil { + if r.Opts.IncludesLower { + builder.WriteString("[") + } else { + builder.WriteString("(") + } + } + builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower)) + builder.WriteString(",") + builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper)) + if r.Opts.Upper != nil { + if r.Opts.IncludesUpper { + builder.WriteString("]") + } else { + builder.WriteString(")") + } + } + data["key"] = r.Key + data["range"] = builder.String() + return json.Marshal(data) +} + +func (r *rangeOp) String() string { + return jsonToString(r) +} + +func jsonToString(marshaler json.Marshaler) string { + bb, err := marshaler.MarshalJSON() + if err != nil { + return err.Error() + } + return string(bb) +} + +var ( + eNode = new(emptyNode) + bList = new(bypassList) +) + +type emptyNode struct{} + +func (an emptyNode) Execute(searcher index.GetSearcher, seriesID common.SeriesID) (posting.List, error) { + return bList, nil +} + +func (an emptyNode) String() string { + return "empty" +} + +type bypassList struct{} + +func (bl bypassList) Contains(id common.ItemID) bool { + // all items should be fetched + return true +} + +func (bl bypassList) IsEmpty() bool { + return false +} + +func (bl bypassList) Max() (common.ItemID, error) { + panic("not invoked") +} + +func (bl bypassList) Len() int { + panic("not invoked") +} + +func (bl bypassList) Iterator() posting.Iterator { + panic("not invoked") +} + +func (bl bypassList) Clone() posting.List { + panic("not invoked") +} + +func (bl bypassList) Equal(other posting.List) bool { + panic("not invoked") +} + +func (bl bypassList) Insert(i common.ItemID) { + panic("not invoked") +} + +func (bl bypassList) Intersect(other posting.List) error { + panic("not invoked") +} + +func (bl bypassList) Difference(other posting.List) error { + panic("not invoked") +} + +func (bl bypassList) Union(other posting.List) error { + panic("not invoked") +} + +func (bl bypassList) UnionMany(others []posting.List) error { + panic("not invoked") +} + +func (bl bypassList) AddIterator(iter posting.Iterator) error { + panic("not invoked") +} + +func (bl bypassList) AddRange(min common.ItemID, max common.ItemID) error { + panic("not invoked") +} + +func (bl bypassList) RemoveRange(min common.ItemID, max common.ItemID) error { + panic("not invoked") +} + +func (bl bypassList) Reset() { + panic("not invoked") +} + +func (bl bypassList) ToSlice() []common.ItemID { + panic("not invoked") +} + +func (bl bypassList) Marshall() ([]byte, error) { + panic("not invoked") +} + +func (bl bypassList) Unmarshall(data []byte) error { + panic("not invoked") +} + +func (bl bypassList) SizeInBytes() int64 { + panic("not invoked") +} diff --git a/pkg/query/logical/interface.go b/pkg/query/logical/interface.go index f572de8..2fce58b 100644 --- a/pkg/query/logical/interface.go +++ b/pkg/query/logical/interface.go @@ -19,21 +19,6 @@ package logical import ( "fmt" - - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" -) - -type PlanType uint8 - -const ( - PlanLimit PlanType = iota - PlanOffset - PlanLocalIndexScan - PlanGlobalIndexScan - PlanGroupBy - PlanAggregation - PlanTop - PlanTagFilter ) type UnresolvedPlan interface { @@ -42,8 +27,6 @@ type UnresolvedPlan interface { type Plan interface { fmt.Stringer - Type() PlanType - Equal(Plan) bool Children() []Plan Schema() Schema } @@ -65,7 +48,7 @@ type ResolvableExpr interface { } type ComparableExpr interface { - Expr - Compare(tagValue *modelv1.TagValue) (int, bool) - BelongTo(tagValue *modelv1.TagValue) bool + LiteralExpr + Compare(LiteralExpr) (int, bool) + BelongTo(LiteralExpr) bool } diff --git a/pkg/query/logical/measure_analyzer.go b/pkg/query/logical/measure_analyzer.go index fc82232..554f44f 100644 --- a/pkg/query/logical/measure_analyzer.go +++ b/pkg/query/logical/measure_analyzer.go @@ -22,10 +22,8 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/tsdb" - "github.com/apache/skywalking-banyandb/pkg/convert" ) type Field struct { @@ -166,8 +164,6 @@ func parseMeasureFields(criteria *measurev1.QueryRequest, metadata *commonv1.Met projFields[i] = NewField(fieldNameProj) } - var tagExprs []Expr - entityList := s.EntityList() entityMap := make(map[string]int) entity := make([]tsdb.Entry, len(entityList)) @@ -176,49 +172,9 @@ func parseMeasureFields(criteria *measurev1.QueryRequest, metadata *commonv1.Met // fill AnyEntry by default entity[idx] = tsdb.AnyEntry } - - for _, criteriaFamily := range criteria.GetCriteria() { - for _, pairQuery := range criteriaFamily.GetConditions() { - op := pairQuery.GetOp() - typedTagValue := pairQuery.GetValue() - var e Expr - switch v := typedTagValue.GetValue().(type) { - case *modelv1.TagValue_Str: - if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = []byte(v.Str.GetValue()) - } else { - e = Str(v.Str.GetValue()) - } - case *modelv1.TagValue_Id: - if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = []byte(v.Id.GetValue()) - } else { - e = ID(v.Id.GetValue()) - } - case *modelv1.TagValue_StrArray: - e = &strArrLiteral{ - arr: v.StrArray.GetValue(), - } - case *modelv1.TagValue_Int: - if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue()) - } else { - e = &int64Literal{ - int64: v.Int.GetValue(), - } - } - case *modelv1.TagValue_IntArray: - e = &int64ArrLiteral{ - arr: v.IntArray.GetValue(), - } - default: - return nil, ErrInvalidConditionType - } - // we collect Condition only if it is not a part of entity - if e != nil { - tagExprs = append(tagExprs, binaryOpFactory[op](NewTagRef(criteriaFamily.GetTagFamilyName(), pairQuery.GetName()), e)) - } - } + predicator, entities, err := buildLocalFilter(criteria.Criteria, s, entityMap, entity) + if err != nil { + return nil, err } // parse orderBy @@ -229,5 +185,5 @@ func parseMeasureFields(criteria *measurev1.QueryRequest, metadata *commonv1.Met } return MeasureIndexScan(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata, - tagExprs, entity, projTags, projFields, groupByEntity, unresolvedOrderBy), nil + predicator, entities, projTags, projFields, groupByEntity, unresolvedOrderBy), nil } diff --git a/pkg/query/logical/measure_plan_indexscan_local.go b/pkg/query/logical/measure_plan_indexscan_local.go index 78a9c83..4f0f162 100644 --- a/pkg/query/logical/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure_plan_indexscan_local.go @@ -24,12 +24,10 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/pkg/errors" "go.uber.org/multierr" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" @@ -43,41 +41,15 @@ type unresolvedMeasureIndexScan struct { startTime time.Time endTime time.Time metadata *commonv1.Metadata - conditions []Expr + predicator Predicator projectionTags [][]*Tag projectionFields []*Field - entity tsdb.Entity + entities []tsdb.Entity groupByEntity bool unresolvedOrderBy *UnresolvedOrderBy } func (uis *unresolvedMeasureIndexScan) Analyze(s Schema) (Plan, error) { - localConditionMap := make(map[*databasev1.IndexRule][]Expr) - for _, cond := range uis.conditions { - if resolvable, ok := cond.(ResolvableExpr); ok { - err := resolvable.Resolve(s) - if err != nil { - return nil, err - } - - if bCond, ok := cond.(*binaryExpr); ok { - tag := bCond.l.(*TagRef).tag - if defined, indexObj := s.IndexDefined(tag); defined { - if indexObj.GetLocation() == databasev1.IndexRule_LOCATION_SERIES { - if v, exist := localConditionMap[indexObj]; exist { - v = append(v, cond) - localConditionMap[indexObj] = v - } else { - localConditionMap[indexObj] = []Expr{cond} - } - } - } else { - return nil, errors.Wrap(ErrIndexNotDefined, tag.GetCompoundName()) - } - } - } - } - var projTagsRefs [][]*TagRef if len(uis.projectionTags) > 0 { var err error @@ -108,8 +80,8 @@ func (uis *unresolvedMeasureIndexScan) Analyze(s Schema) (Plan, error) { projectionTagsRefs: projTagsRefs, projectionFieldsRefs: projFieldRefs, metadata: uis.metadata, - conditionMap: localConditionMap, - entity: uis.entity, + predicator: uis.predicator, + entities: uis.entities, groupByEntity: uis.groupByEntity, orderBy: orderBySubPlan, }, nil @@ -122,15 +94,15 @@ type localMeasureIndexScan struct { timeRange timestamp.TimeRange schema Schema metadata *commonv1.Metadata - conditionMap map[*databasev1.IndexRule][]Expr + predicator Predicator projectionTagsRefs [][]*TagRef projectionFieldsRefs []*FieldRef - entity tsdb.Entity + entities []tsdb.Entity groupByEntity bool } func (i *localMeasureIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.MIterator, error) { - shards, err := ec.Shards(i.entity) + shards, err := ec.Shards(i.entities) if err != nil { return nil, err } @@ -255,17 +227,17 @@ func (i *localMeasureIndexScan) Equal(plan Plan) bool { cmp.Equal(i.orderBy, other.orderBy) } -func MeasureIndexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, conditions []Expr, entity tsdb.Entity, +func MeasureIndexScan(startTime, endTime time.Time, metadata *commonv1.Metadata, predicator Predicator, entities []tsdb.Entity, projectionTags [][]*Tag, projectionFields []*Field, groupByEntity bool, unresolvedOrderBy *UnresolvedOrderBy, ) UnresolvedPlan { return &unresolvedMeasureIndexScan{ startTime: startTime, endTime: endTime, metadata: metadata, - conditions: conditions, + predicator: predicator, projectionTags: projectionTags, projectionFields: projectionFields, - entity: entity, + entities: entities, groupByEntity: groupByEntity, unresolvedOrderBy: unresolvedOrderBy, } diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go index 534f594..5e04600 100644 --- a/pkg/query/logical/schema.go +++ b/pkg/query/logical/schema.go @@ -29,7 +29,7 @@ import ( type Schema interface { Scope() tsdb.Entry EntityList() []string - IndexDefined(*Tag) (bool, *databasev1.IndexRule) + IndexDefined(tagName string) (bool, *databasev1.IndexRule) IndexRuleDefined(string) (bool, *databasev1.IndexRule) CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error) CreateFieldRef(fields ...*Field) ([]*FieldRef, error) @@ -105,10 +105,10 @@ func (cs *commonSchema) EntityList() []string { } // IndexDefined checks whether the field given is indexed -func (cs *commonSchema) IndexDefined(tag *Tag) (bool, *databasev1.IndexRule) { +func (cs *commonSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { for _, idxRule := range cs.indexRules { for _, tagName := range idxRule.GetTags() { - if tag.GetTagName() == tagName { + if tagName == tagName { return true, idxRule } } @@ -167,8 +167,8 @@ func (s *streamSchema) TraceIDFieldName() string { } // IndexDefined checks whether the field given is indexed -func (s *streamSchema) IndexDefined(tag *Tag) (bool, *databasev1.IndexRule) { - return s.common.IndexDefined(tag) +func (s *streamSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { + return s.common.IndexDefined(tagName) } func (s *streamSchema) Equal(s2 Schema) bool { @@ -236,8 +236,8 @@ func (m *measureSchema) EntityList() []string { return m.common.EntityList() } -func (m *measureSchema) IndexDefined(tag *Tag) (bool, *databasev1.IndexRule) { - return m.common.IndexDefined(tag) +func (m *measureSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) { + return m.common.IndexDefined(tagName) } func (m *measureSchema) IndexRuleDefined(indexRuleName string) (bool, *databasev1.IndexRule) { diff --git a/pkg/query/logical/stream_analyzer.go b/pkg/query/logical/stream_analyzer.go index d8f5883..31acd98 100644 --- a/pkg/query/logical/stream_analyzer.go +++ b/pkg/query/logical/stream_analyzer.go @@ -21,11 +21,8 @@ import ( "context" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/tsdb" - "github.com/apache/skywalking-banyandb/pkg/convert" ) var DefaultLimit uint32 = 100 @@ -155,57 +152,6 @@ func parseStreamFields(criteria *streamv1.QueryRequest, metadata *commonv1.Metad projTags[i] = projTagInFamily } - var tagExprs []Expr - - entityList := s.EntityList() - entityMap := make(map[string]int) - entity := make([]tsdb.Entry, len(entityList)) - for idx, e := range entityList { - entityMap[e] = idx - // fill AnyEntry by default - entity[idx] = tsdb.AnyEntry - } - - for _, criteriaFamily := range criteria.GetCriteria() { - for _, pairQuery := range criteriaFamily.GetConditions() { - op := pairQuery.GetOp() - typedTagValue := pairQuery.GetValue() - var e Expr - switch v := typedTagValue.GetValue().(type) { - case *modelv1.TagValue_Str: - if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = []byte(v.Str.GetValue()) - } else { - e = &strLiteral{ - string: v.Str.GetValue(), - } - } - case *modelv1.TagValue_StrArray: - e = &strArrLiteral{ - arr: v.StrArray.GetValue(), - } - case *modelv1.TagValue_Int: - if entityIdx, ok := entityMap[pairQuery.GetName()]; ok { - entity[entityIdx] = convert.Int64ToBytes(v.Int.GetValue()) - } else { - e = &int64Literal{ - int64: v.Int.GetValue(), - } - } - case *modelv1.TagValue_IntArray: - e = &int64ArrLiteral{ - arr: v.IntArray.GetValue(), - } - default: - return nil, ErrInvalidConditionType - } - // we collect Condition only if it is not a part of entity - if e != nil { - tagExprs = append(tagExprs, binaryOpFactory[op](NewTagRef(criteriaFamily.GetTagFamilyName(), pairQuery.GetName()), e)) - } - } - } - return TagFilter(timeRange.GetBegin().AsTime(), timeRange.GetEnd().AsTime(), metadata, - tagExprs, entity, nil, projTags...), nil + criteria.Criteria, nil, projTags...), nil } diff --git a/pkg/query/logical/stream_plan.go b/pkg/query/logical/stream_plan.go index eed1c4b..baaac8d 100644 --- a/pkg/query/logical/stream_plan.go +++ b/pkg/query/logical/stream_plan.go @@ -52,17 +52,6 @@ func (l *limit) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element return entities, nil } -func (l *limit) Equal(plan Plan) bool { - if plan.Type() != PlanLimit { - return false - } - other := plan.(*limit) - if l.limitNum == other.limitNum { - return l.input.Equal(other.input) - } - return false -} - func (l *limit) Analyze(s Schema) (Plan, error) { var err error l.input, err = l.unresolvedInput.Analyze(s) @@ -84,10 +73,6 @@ func (l *limit) Children() []Plan { return []Plan{l.input} } -func (l *limit) Type() PlanType { - return PlanLimit -} - func Limit(input UnresolvedPlan, num uint32) UnresolvedPlan { return &limit{ parent: &parent{ @@ -120,17 +105,6 @@ func (l *offset) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Elemen return []*streamv1.Element{}, nil } -func (l *offset) Equal(plan Plan) bool { - if plan.Type() != PlanOffset { - return false - } - other := plan.(*offset) - if l.offsetNum == other.offsetNum { - return l.input.Equal(other.input) - } - return false -} - func (l *offset) Analyze(s Schema) (Plan, error) { var err error l.input, err = l.unresolvedInput.Analyze(s) @@ -152,10 +126,6 @@ func (l *offset) Children() []Plan { return []Plan{l.input} } -func (l *offset) Type() PlanType { - return PlanOffset -} - func Offset(input UnresolvedPlan, num uint32) UnresolvedPlan { return &offset{ parent: &parent{ diff --git a/pkg/query/logical/stream_plan_indexscan_global.go b/pkg/query/logical/stream_plan_indexscan_global.go index cfe5504..abf8173 100644 --- a/pkg/query/logical/stream_plan_indexscan_global.go +++ b/pkg/query/logical/stream_plan_indexscan_global.go @@ -22,7 +22,6 @@ import ( "io" "time" - "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" @@ -45,10 +44,6 @@ type globalIndexScan struct { } func (t *globalIndexScan) String() string { - if len(t.projectionTagRefs) == 0 { - return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},condition=%s; projection=None", - t.metadata.GetGroup(), t.metadata.GetName(), t.expr.String()) - } return fmt.Sprintf("GlobalIndexScan: Metadata{group=%s,name=%s},conditions=%s; projection=%s", t.metadata.GetGroup(), t.metadata.GetName(), t.expr.String(), formatTagRefs(", ", t.projectionTagRefs...)) @@ -58,27 +53,10 @@ func (t *globalIndexScan) Children() []Plan { return []Plan{} } -func (t *globalIndexScan) Type() PlanType { - return PlanGlobalIndexScan -} - func (t *globalIndexScan) Schema() Schema { return t.schema } -func (t *globalIndexScan) Equal(plan Plan) bool { - if plan.Type() != PlanGlobalIndexScan { - return false - } - other := plan.(*globalIndexScan) - return t.metadata.GetGroup() == other.metadata.GetGroup() && - t.metadata.GetName() == other.metadata.GetName() && - cmp.Equal(t.projectionTagRefs, other.projectionTagRefs) && - cmp.Equal(t.schema, other.schema) && - cmp.Equal(t.globalIndexRule.GetMetadata().GetId(), other.globalIndexRule.GetMetadata().GetId()) && - cmp.Equal(t.expr, other.expr) -} - func (t *globalIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { shards, err := ec.Shards(nil) if err != nil { diff --git a/pkg/query/logical/stream_plan_indexscan_local.go b/pkg/query/logical/stream_plan_indexscan_local.go index bec23df..499cae6 100644 --- a/pkg/query/logical/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream_plan_indexscan_local.go @@ -18,16 +18,12 @@ package logical import ( - "bytes" "fmt" - "strings" "time" - "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" @@ -42,26 +38,47 @@ type localIndexScan struct { timeRange timestamp.TimeRange schema Schema metadata *commonv1.Metadata - conditionMap map[*databasev1.IndexRule][]Expr projectionTagRefs [][]*TagRef - entity tsdb.Entity + entities []tsdb.Entity + filter index.Filter } func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { - shards, err := ec.Shards(i.entity) - if err != nil { - return nil, err - } - var iters []tsdb.Iterator - for _, shard := range shards { - itersInShard, innerErr := i.executeInShard(shard) - if innerErr != nil { - return nil, innerErr + var seriesList tsdb.SeriesList + for _, e := range i.entities { + shards, err := ec.Shards(e) + if err != nil { + return nil, err } - if itersInShard == nil { - continue + for _, shard := range shards { + sl, err := shard.Series().List(tsdb.NewPath(e)) + if err != nil { + return nil, err + } + seriesList = append(seriesList, sl...) } - iters = append(iters, itersInShard...) + } + if len(seriesList) == 0 { + return nil, nil + } + var builders []seekerBuilder + if i.index != nil { + builders = append(builders, func(builder tsdb.SeekerBuilder) { + builder.OrderByIndex(i.index, i.sort) + }) + } else { + builders = append(builders, func(builder tsdb.SeekerBuilder) { + builder.OrderByTime(i.sort) + }) + } + if i.filter != nil { + builders = append(builders, func(b tsdb.SeekerBuilder) { + b.Filter(i.filter) + }) + } + iters, innerErr := executeForShard(seriesList, i.timeRange, builders...) + if innerErr != nil { + return nil, innerErr } var elems []*streamv1.Element @@ -91,59 +108,10 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv return elems, nil } -func (i *localIndexScan) executeInShard(shard tsdb.Shard) ([]tsdb.Iterator, error) { - seriesList, err := shard.Series().List(tsdb.NewPath(i.entity)) - if err != nil { - return nil, err - } - - if len(seriesList) == 0 { - return nil, nil - } - - var builders []seekerBuilder - - if i.index != nil { - builders = append(builders, func(builder tsdb.SeekerBuilder) { - builder.OrderByIndex(i.index, i.sort) - }) - } else { - builders = append(builders, func(builder tsdb.SeekerBuilder) { - builder.OrderByTime(i.sort) - }) - } - - if i.conditionMap != nil && len(i.conditionMap) > 0 { - builders = append(builders, func(b tsdb.SeekerBuilder) { - for idxRule, exprs := range i.conditionMap { - b.Filter(idxRule, exprToCondition(exprs)) - } - }) - } - - return executeForShard(seriesList, i.timeRange, builders...) -} - func (i *localIndexScan) String() string { - exprStr := make([]string, 0, len(i.conditionMap)) - for _, conditions := range i.conditionMap { - var conditionStr []string - for _, cond := range conditions { - conditionStr = append(conditionStr, cond.String()) - } - exprStr = append(exprStr, fmt.Sprintf("(%s)", strings.Join(conditionStr, " AND "))) - } - if len(i.projectionTagRefs) == 0 { - return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=None", - i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), strings.Join(exprStr, " AND ")) - } return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), - strings.Join(exprStr, " AND "), formatTagRefs(", ", i.projectionTagRefs...)) -} - -func (i *localIndexScan) Type() PlanType { - return PlanLocalIndexScan + i.filter, formatTagRefs(", ", i.projectionTagRefs...)) } func (i *localIndexScan) Children() []Plan { @@ -156,44 +124,3 @@ func (i *localIndexScan) Schema() Schema { } return i.schema.ProjTags(i.projectionTagRefs...) } - -func (i *localIndexScan) Equal(plan Plan) bool { - if plan.Type() != PlanLocalIndexScan { - return false - } - other := plan.(*localIndexScan) - return i.metadata.GetGroup() == other.metadata.GetGroup() && - i.metadata.GetName() == other.metadata.GetName() && - i.timeRange.Start.UnixNano() == other.timeRange.Start.UnixNano() && - i.timeRange.End.UnixNano() == other.timeRange.End.UnixNano() && - len(i.entity) == len(other.entity) && - bytes.Equal(i.entity.Marshal(), other.entity.Marshal()) && - cmp.Equal(i.projectionTagRefs, other.projectionTagRefs) && - cmp.Equal(i.schema, other.schema) && - cmp.Equal(i.conditionMap, other.conditionMap) && - cmp.Equal(i.orderBy, other.orderBy) -} - -func exprToCondition(exprs []Expr) tsdb.Condition { - cond := make(map[string][]index.ConditionValue) - for _, expr := range exprs { - bExpr := expr.(*binaryExpr) - l := bExpr.l.(*TagRef) - r := bExpr.r.(LiteralExpr) - if existingList, ok := cond[l.tag.GetTagName()]; ok { - existingList = append(existingList, index.ConditionValue{ - Values: r.Bytes(), - Op: bExpr.op, - }) - cond[l.tag.GetTagName()] = existingList - } else { - cond[l.tag.GetTagName()] = []index.ConditionValue{ - { - Values: r.Bytes(), - Op: bExpr.op, - }, - } - } - } - return cond -} diff --git a/pkg/query/logical/stream_plan_tag_filter.go b/pkg/query/logical/stream_plan_tag_filter.go index 33f0971..3138c39 100644 --- a/pkg/query/logical/stream_plan_tag_filter.go +++ b/pkg/query/logical/stream_plan_tag_filter.go @@ -25,6 +25,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -36,43 +37,28 @@ type unresolvedTagFilter struct { startTime time.Time endTime time.Time metadata *commonv1.Metadata - conditions []Expr projectionTags [][]*Tag - entity tsdb.Entity + criteria *modelv1.Criteria } func (uis *unresolvedTagFilter) Analyze(s Schema) (Plan, error) { ctx := newStreamAnalyzerContext(s) - for _, cond := range uis.conditions { - resolvable, ok := cond.(ResolvableExpr) - if !ok { - continue - } - err := resolvable.Resolve(s) - if err != nil { + entityList := s.EntityList() + entityDict := make(map[string]int) + entity := make([]tsdb.Entry, len(entityList)) + for idx, e := range entityList { + entityDict[e] = idx + // fill AnyEntry by default + entity[idx] = tsdb.AnyEntry + } + var err error + ctx.filter, ctx.entities, err = buildLocalFilter(uis.criteria, s, entityDict, entity) + if err != nil { + if ge, ok := err.(*globalIndexError); ok { + ctx.globalConditions = append(ctx.globalConditions, ge.indexRule, ge.expr) + } else { return nil, err } - bCond, ok := cond.(*binaryExpr) - if !ok { - continue - } - tag := bCond.l.(*TagRef).tag - defined, indexObj := s.IndexDefined(tag) - if !defined { - ctx.tagFilters = append(ctx.tagFilters, bCond) - continue - } - switch indexObj.GetLocation() { - case databasev1.IndexRule_LOCATION_SERIES: - if v, exist := ctx.localConditionMap[indexObj]; exist { - v = append(v, cond) - ctx.localConditionMap[indexObj] = v - } else { - ctx.localConditionMap[indexObj] = []Expr{cond} - } - case databasev1.IndexRule_LOCATION_GLOBAL: - ctx.globalConditions = append(ctx.globalConditions, indexObj, cond) - } } if len(uis.projectionTags) > 0 { @@ -86,15 +72,17 @@ func (uis *unresolvedTagFilter) Analyze(s Schema) (Plan, error) { if err != nil { return nil, err } - if len(ctx.tagFilters) > 0 { - plan = NewTagFilter(s, plan, ctx.tagFilters) + tagFilter, err := buildTagFilter(uis.criteria, s) + if err != nil { + return nil, err } + plan = NewTagFilter(s, plan, tagFilter) return plan, err } func (uis *unresolvedTagFilter) selectIndexScanner(ctx *streamAnalyzeContext) (Plan, error) { if len(ctx.globalConditions) > 0 { - if len(ctx.globalConditions)/2 > 1 { + if len(ctx.globalConditions) > 2 { return nil, ErrMultipleGlobalIndexes } return &globalIndexScan{ @@ -102,7 +90,7 @@ func (uis *unresolvedTagFilter) selectIndexScanner(ctx *streamAnalyzeContext) (P projectionTagRefs: ctx.projTagsRefs, metadata: uis.metadata, globalIndexRule: ctx.globalConditions[0].(*databasev1.IndexRule), - expr: ctx.globalConditions[1].(Expr), + expr: ctx.globalConditions[1].(LiteralExpr), }, nil } @@ -118,12 +106,12 @@ func (uis *unresolvedTagFilter) selectIndexScanner(ctx *streamAnalyzeContext) (P schema: ctx.s, projectionTagRefs: ctx.projTagsRefs, metadata: uis.metadata, - conditionMap: ctx.localConditionMap, - entity: uis.entity, + filter: ctx.filter, + entities: ctx.entities, }, nil } -func TagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, conditions []Expr, entity tsdb.Entity, +func TagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, criteria *modelv1.Criteria, orderBy *UnresolvedOrderBy, projection ...[]*Tag, ) UnresolvedPlan { return &unresolvedTagFilter{ @@ -131,139 +119,71 @@ func TagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, condit startTime: startTime, endTime: endTime, metadata: metadata, - conditions: conditions, + criteria: criteria, projectionTags: projection, - entity: entity, - } -} - -// GlobalIndexScan is a short-handed method for composing a globalIndexScan plan -func GlobalIndexScan(metadata *commonv1.Metadata, conditions []Expr, projection ...[]*Tag) UnresolvedPlan { - return &unresolvedTagFilter{ - metadata: metadata, - conditions: conditions, - projectionTags: projection, } } type streamAnalyzeContext struct { - s Schema - localConditionMap map[*databasev1.IndexRule][]Expr - globalConditions []interface{} - projTagsRefs [][]*TagRef - tagFilters []*binaryExpr + s Schema + filter index.Filter + entities []tsdb.Entity + globalConditions []interface{} + projTagsRefs [][]*TagRef } func newStreamAnalyzerContext(s Schema) *streamAnalyzeContext { return &streamAnalyzeContext{ - localConditionMap: make(map[*databasev1.IndexRule][]Expr), - globalConditions: make([]interface{}, 0), - s: s, + globalConditions: make([]interface{}, 0), + s: s, } } var ( - _ Plan = (*tagFilter)(nil) - _ executor.StreamExecutable = (*tagFilter)(nil) + _ Plan = (*tagFilterPlan)(nil) + _ executor.StreamExecutable = (*tagFilterPlan)(nil) ) -type tagFilter struct { - s Schema - parent Plan - tagFilters []*binaryExpr +type tagFilterPlan struct { + s Schema + parent Plan + tagFilter tagFilter } -func NewTagFilter(s Schema, parent Plan, tagFilters []*binaryExpr) Plan { - return &tagFilter{ - s: s, - parent: parent, - tagFilters: tagFilters, +func NewTagFilter(s Schema, parent Plan, tagFilter tagFilter) Plan { + return &tagFilterPlan{ + s: s, + parent: parent, + tagFilter: tagFilter, } } -func (t *tagFilter) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { +func (t *tagFilterPlan) Execute(ec executor.StreamExecutionContext) ([]*streamv1.Element, error) { entities, err := t.parent.(executor.StreamExecutable).Execute(ec) if err != nil { return nil, err } filteredElements := make([]*streamv1.Element, 0) for _, e := range entities { - if t.check(e) { + ok, err := t.tagFilter.match(e.TagFamilies) + if err != nil { + return nil, err + } + if ok { filteredElements = append(filteredElements, e) } } return filteredElements, nil } -func (*tagFilter) String() string { - panic("unimplemented") +func (t *tagFilterPlan) String() string { + return t.tagFilter.String() } -func (t *tagFilter) Children() []Plan { +func (t *tagFilterPlan) Children() []Plan { return []Plan{t.parent} } -func (*tagFilter) Equal(another Plan) bool { - panic("unimplemented") -} - -func (t *tagFilter) Schema() Schema { +func (t *tagFilterPlan) Schema() Schema { return t.s } - -func (*tagFilter) Type() PlanType { - return PlanTagFilter -} - -func (t *tagFilter) check(element *streamv1.Element) bool { - compare := func(ce ComparableExpr, tagValue *modelv1.TagValue, exp func(result int) bool) bool { - c, b := ce.Compare(tagValue) - if b { - return exp(c) - } - return false - } - for _, filter := range t.tagFilters { - l := filter.l.(*TagRef) - tagValue, exist := tagValue(element, l) - if !exist { - return false - } - r, ok := filter.r.(ComparableExpr) - if !ok { - continue - } - switch filter.op { - case modelv1.Condition_BINARY_OP_EQ: - return compare(r, tagValue, func(c int) bool { return c == 0 }) - case modelv1.Condition_BINARY_OP_GE: - return compare(r, tagValue, func(c int) bool { return c >= 0 }) - case modelv1.Condition_BINARY_OP_GT: - return compare(r, tagValue, func(c int) bool { return c > 0 }) - case modelv1.Condition_BINARY_OP_LE: - return compare(r, tagValue, func(c int) bool { return c <= 0 }) - case modelv1.Condition_BINARY_OP_LT: - return compare(r, tagValue, func(c int) bool { return c < 0 }) - case modelv1.Condition_BINARY_OP_HAVING: - return r.BelongTo(tagValue) - case modelv1.Condition_BINARY_OP_NOT_HAVING: - return !r.BelongTo(tagValue) - } - } - - return true -} - -func tagValue(element *streamv1.Element, tagRef *TagRef) (*modelv1.TagValue, bool) { - for _, tf := range element.TagFamilies { - if tf.Name != tagRef.tag.familyName { - continue - } - for _, t := range tf.Tags { - if t.Key == tagRef.tag.name { - return t.Value, true - } - } - } - return nil, false -} diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go new file mode 100644 index 0000000..8bd2a35 --- /dev/null +++ b/pkg/query/logical/tag_filter.go @@ -0,0 +1,417 @@ +package logical + +import ( + "encoding/json" + "fmt" + "strings" + + model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +type tagFilter interface { + fmt.Stringer + match(tagFamilies []*model_v1.TagFamily) (bool, error) +} + +func buildTagFilter(criteria *model_v1.Criteria, schema Schema) (tagFilter, error) { + switch criteria.GetExp().(type) { + case *model_v1.Criteria_Condition: + cond := criteria.GetCondition() + expr, err := parseExpr(cond.Value) + if err != nil { + return nil, err + } + if ok, _ := schema.IndexDefined(cond.Name); ok { + return bypassFilter, nil + } + return parseFilter(cond, expr) + case *model_v1.Criteria_Le: + le := criteria.GetLe() + left, err := buildTagFilter(le.Left, schema) + if err != nil { + return nil, err + } + right, err := buildTagFilter(le.Left, schema) + if err != nil { + return nil, err + } + switch le.Op { + case model_v1.LogicalExpression_LOGICAL_OP_AND: + and := newAndLogicalNode(2) + and.append(left).append(right) + return and, nil + case model_v1.LogicalExpression_LOGICAL_OP_OR: + or := newOrLogicalNode(2) + or.append(left).append(right) + return or, nil + } + + } + return nil, ErrInvalidConditionType +} + +func parseFilter(cond *model_v1.Condition, expr ComparableExpr) (tagFilter, error) { + switch cond.Op { + case model_v1.Condition_BINARY_OP_GT: + return newRangeTag(cond.Name, RangeOpts{ + Lower: expr, + }), nil + case model_v1.Condition_BINARY_OP_GE: + return newRangeTag(cond.Name, RangeOpts{ + IncludesLower: true, + Lower: expr, + }), nil + case model_v1.Condition_BINARY_OP_LT: + return newRangeTag(cond.Name, RangeOpts{ + Upper: expr, + }), nil + case model_v1.Condition_BINARY_OP_LE: + return newRangeTag(cond.Name, RangeOpts{ + IncludesUpper: true, + Upper: expr, + }), nil + case model_v1.Condition_BINARY_OP_EQ: + return newEqTag(cond.Name, expr), nil + case model_v1.Condition_BINARY_OP_NE: + return newNotTag(newEqTag(cond.Name, expr)), nil + case model_v1.Condition_BINARY_OP_HAVING: + return newHavingTag(cond.Name, expr), nil + case model_v1.Condition_BINARY_OP_NOT_HAVING: + return newNotTag(newHavingTag(cond.Name, expr)), nil + } + return nil, ErrInvalidConditionType +} + +func parseExpr(value *model_v1.TagValue) (ComparableExpr, error) { + switch v := value.Value.(type) { + case *model_v1.TagValue_Str: + return &strLiteral{v.Str.GetValue()}, nil + case *model_v1.TagValue_Id: + return &idLiteral{v.Id.GetValue()}, nil + case *model_v1.TagValue_StrArray: + return &strArrLiteral{ + arr: v.StrArray.GetValue(), + }, nil + case *model_v1.TagValue_Int: + return &int64Literal{ + int64: v.Int.GetValue(), + }, nil + case *model_v1.TagValue_IntArray: + return &int64ArrLiteral{ + arr: v.IntArray.GetValue(), + }, nil + } + return nil, ErrInvalidConditionType +} + +var bypassFilter = new(emptyFilter) + +type emptyFilter struct{} + +func (emptyFilter) match(_ []*model_v1.TagFamily) (bool, error) { return true, nil } + +func (emptyFilter) String() string { return "true" } + +type logicalNodeOP interface { + tagFilter + merge(bool) +} + +type logicalNode struct { + result *bool + SubNodes []tagFilter `json:"sub_nodes,omitempty"` +} + +func (n *logicalNode) append(sub tagFilter) *logicalNode { + n.SubNodes = append(n.SubNodes, sub) + return n +} + +func (n *logicalNode) pop() (tagFilter, bool) { + if len(n.SubNodes) < 1 { + return nil, false + } + sn := n.SubNodes[0] + n.SubNodes = n.SubNodes[1:] + return sn, true +} + +func matchTag(tagFamilies []*model_v1.TagFamily, n *logicalNode, lp logicalNodeOP) (bool, error) { + ex, hasNext := n.pop() + if !hasNext { + if n.result == nil { + return true, nil + } + return *n.result, nil + } + r, err := ex.match(tagFamilies) + if err != nil { + return false, err + } + if n.result == nil { + n.result = &r + return lp.match(tagFamilies) + } + lp.merge(r) + return lp.match(tagFamilies) +} + +type andLogicalNode struct { + *logicalNode +} + +func newAndLogicalNode(size int) *andLogicalNode { + return &andLogicalNode{ + logicalNode: &logicalNode{ + SubNodes: make([]tagFilter, 0, size), + }, + } +} + +func (an *andLogicalNode) merge(b bool) { + r := *an.result && b + an.result = &r +} + +func (an *andLogicalNode) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + return matchTag(tagFamilies, an.logicalNode, an) +} + +func (an *andLogicalNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["and"] = an.logicalNode.SubNodes + return json.Marshal(data) +} + +func (an *andLogicalNode) String() string { + return jsonToString(an) +} + +type orLogicalNode struct { + *logicalNode +} + +func newOrLogicalNode(size int) *orLogicalNode { + return &orLogicalNode{ + logicalNode: &logicalNode{ + SubNodes: make([]tagFilter, 0, size), + }, + } +} + +func (on *orLogicalNode) merge(b bool) { + r := *on.result || b + on.result = &r +} + +func (on *orLogicalNode) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + return matchTag(tagFamilies, on.logicalNode, on) +} + +func (on *orLogicalNode) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["or"] = on.logicalNode.SubNodes + return json.Marshal(data) +} + +func (on *orLogicalNode) String() string { + return jsonToString(on) +} + +type tagLeaf struct { + tagFilter + Name string + Expr LiteralExpr +} + +type notTag struct { + tagFilter + Inner tagFilter +} + +func newNotTag(inner tagFilter) *notTag { + return ¬Tag{ + Inner: inner, + } +} + +func (n *notTag) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + b, err := n.Inner.match(tagFamilies) + if err != nil { + return false, err + } + return !b, nil +} + +func (n *notTag) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["not"] = n.Inner + return json.Marshal(data) +} + +func (n *notTag) String() string { + return jsonToString(n) +} + +type eqTag struct { + *tagLeaf +} + +func newEqTag(tagName string, values LiteralExpr) *eqTag { + return &eqTag{ + tagLeaf: &tagLeaf{ + Name: tagName, + Expr: values, + }, + } +} + +func (eq *eqTag) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + expr, err := tagExpr(tagFamilies, eq.Name) + if err != nil { + return false, err + } + return eq.Expr.Equal(expr), nil +} + +func (eq *eqTag) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["eq"] = eq.tagLeaf + return json.Marshal(data) +} + +func (eq *eqTag) String() string { + return jsonToString(eq) +} + +type RangeOpts struct { + Upper ComparableExpr + Lower ComparableExpr + IncludesUpper bool + IncludesLower bool +} + +type rangeTag struct { + *tagLeaf + Opts RangeOpts +} + +func newRangeTag(tagName string, opts RangeOpts) *rangeTag { + return &rangeTag{ + tagLeaf: &tagLeaf{ + Name: tagName, + }, + Opts: opts, + } +} + +func (r *rangeTag) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + expr, err := tagExpr(tagFamilies, r.Name) + if err != nil { + return false, err + } + if r.Opts.Lower != nil { + lower := r.Opts.Lower + c, b := lower.Compare(expr) + if !b { + return false, nil + } + if r.Opts.IncludesLower { + if c > 0 { + return false, nil + } + } else { + if c >= 0 { + return false, nil + } + } + } + if r.Opts.Upper != nil { + upper := r.Opts.Upper + c, b := upper.Compare(expr) + if !b { + return false, nil + } + if r.Opts.IncludesUpper { + if c < 0 { + return false, nil + } + } else { + if c <= 0 { + return false, nil + } + } + } + return true, nil +} + +func (r *rangeTag) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + var builder strings.Builder + if r.Opts.Lower != nil { + if r.Opts.IncludesLower { + builder.WriteString("[") + } else { + builder.WriteString("(") + } + builder.WriteString(r.Opts.Lower.String()) + } + if r.Opts.Upper != nil { + builder.WriteString(",") + builder.WriteString(r.Opts.Upper.String()) + if r.Opts.IncludesUpper { + builder.WriteString("]") + } else { + builder.WriteString(")") + } + } + data["key"] = r.tagLeaf + data["range"] = builder.String() + return json.Marshal(data) +} + +func (r *rangeTag) String() string { + return jsonToString(r) +} + +func tagExpr(tagFamilies []*model_v1.TagFamily, tagName string) (ComparableExpr, error) { + for _, tf := range tagFamilies { + for _, t := range tf.Tags { + if t.Key == tagName { + return parseExpr(t.Value) + } + } + } + return nil, ErrTagNotDefined +} + +type havingTag struct { + *tagLeaf +} + +func newHavingTag(tagName string, values LiteralExpr) *havingTag { + return &havingTag{ + tagLeaf: &tagLeaf{ + Name: tagName, + Expr: values, + }, + } +} + +func (h *havingTag) match(tagFamilies []*model_v1.TagFamily) (bool, error) { + expr, err := tagExpr(tagFamilies, h.Name) + if err != nil { + return false, err + } + return expr.BelongTo(h.Expr), nil +} + +func (h *havingTag) MarshalJSON() ([]byte, error) { + data := make(map[string]interface{}, 1) + data["having"] = h.tagLeaf + return json.Marshal(data) +} + +func (h *havingTag) String() string { + return jsonToString(h) +}