This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch api-resource in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bab2b84bc568f8fa6e77ecd0d1fb48e6797385ac Author: Gao Hongtao <[email protected]> AuthorDate: Wed Sep 21 09:50:10 2022 +0000 Add segment interval options Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/common/v1/common.pb.go | 292 ++++++++++++++++----- api/proto/banyandb/common/v1/common.pb.validate.go | 274 ++++++++++++++++++- api/proto/banyandb/common/v1/common.proto | 27 +- .../openapi/banyandb/database/v1/rpc.swagger.json | 36 ++- banyand/measure/metadata.go | 8 + banyand/metadata/schema/testdata/group.json | 13 +- banyand/stream/metadata.go | 9 + banyand/tsdb/tsdb.go | 18 +- banyand/tsdb/tsdb_test.go | 2 + bydbctl/internal/cmd/group.go | 3 +- bydbctl/internal/cmd/stream_test.go | 14 +- docs/api-reference.md | 36 ++- pkg/pb/v1/metadata.go | 79 +++--- pkg/test/measure/testdata/groups/sw_metric.json | 14 +- pkg/test/stream/testdata/group.json | 13 +- 15 files changed, 707 insertions(+), 131 deletions(-) diff --git a/api/proto/banyandb/common/v1/common.pb.go b/api/proto/banyandb/common/v1/common.pb.go index c2b6f11..9f8985f 100644 --- a/api/proto/banyandb/common/v1/common.pb.go +++ b/api/proto/banyandb/common/v1/common.pb.go @@ -88,6 +88,55 @@ func (Catalog) EnumDescriptor() ([]byte, []int) { return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{0} } +type IntervalRule_Unit int32 + +const ( + IntervalRule_UNIT_UNSPECIFIED IntervalRule_Unit = 0 + IntervalRule_UNIT_HOUR IntervalRule_Unit = 1 + IntervalRule_UNIT_DAY IntervalRule_Unit = 2 +) + +// Enum value maps for IntervalRule_Unit. +var ( + IntervalRule_Unit_name = map[int32]string{ + 0: "UNIT_UNSPECIFIED", + 1: "UNIT_HOUR", + 2: "UNIT_DAY", + } + IntervalRule_Unit_value = map[string]int32{ + "UNIT_UNSPECIFIED": 0, + "UNIT_HOUR": 1, + "UNIT_DAY": 2, + } +) + +func (x IntervalRule_Unit) Enum() *IntervalRule_Unit { + p := new(IntervalRule_Unit) + *p = x + return p +} + +func (x IntervalRule_Unit) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (IntervalRule_Unit) Descriptor() protoreflect.EnumDescriptor { + return file_banyandb_common_v1_common_proto_enumTypes[1].Descriptor() +} + +func (IntervalRule_Unit) Type() protoreflect.EnumType { + return &file_banyandb_common_v1_common_proto_enumTypes[1] +} + +func (x IntervalRule_Unit) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use IntervalRule_Unit.Descriptor instead. +func (IntervalRule_Unit) EnumDescriptor() ([]byte, []int) { + return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{1, 0} +} + // Metadata is for multi-tenant, multi-model use type Metadata struct { state protoimpl.MessageState @@ -172,6 +221,63 @@ func (x *Metadata) GetModRevision() int64 { return 0 } +// IntervalRule is a structured duration +type IntervalRule struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // unit can only be UNIT_HOUR or UNIT_DAY + Unit IntervalRule_Unit `protobuf:"varint,1,opt,name=unit,proto3,enum=banyandb.common.v1.IntervalRule_Unit" json:"unit,omitempty"` + Num uint32 `protobuf:"varint,2,opt,name=num,proto3" json:"num,omitempty"` +} + +func (x *IntervalRule) Reset() { + *x = IntervalRule{} + if protoimpl.UnsafeEnabled { + mi := &file_banyandb_common_v1_common_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IntervalRule) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IntervalRule) ProtoMessage() {} + +func (x *IntervalRule) ProtoReflect() protoreflect.Message { + mi := &file_banyandb_common_v1_common_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IntervalRule.ProtoReflect.Descriptor instead. +func (*IntervalRule) Descriptor() ([]byte, []int) { + return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{1} +} + +func (x *IntervalRule) GetUnit() IntervalRule_Unit { + if x != nil { + return x.Unit + } + return IntervalRule_UNIT_UNSPECIFIED +} + +func (x *IntervalRule) GetNum() uint32 { + if x != nil { + return x.Num + } + return 0 +} + type ResourceOpts struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -179,16 +285,19 @@ type ResourceOpts struct { // shard_num is the number of shards ShardNum uint32 `protobuf:"varint,1,opt,name=shard_num,json=shardNum,proto3" json:"shard_num,omitempty"` - // block_num specific how many blocks in a segment - BlockNum uint32 `protobuf:"varint,2,opt,name=block_num,json=blockNum,proto3" json:"block_num,omitempty"` + // block_interval indicates the length of a block + // block_interval should be less than or equal to segment_interval + BlockInterval *IntervalRule `protobuf:"bytes,2,opt,name=block_interval,json=blockInterval,proto3" json:"block_interval,omitempty"` + // segment_interval indicates the length of a segment + SegmentInterval *IntervalRule `protobuf:"bytes,3,opt,name=segment_interval,json=segmentInterval,proto3" json:"segment_interval,omitempty"` // ttl indicates time to live, how long the data will be cached - Ttl string `protobuf:"bytes,3,opt,name=ttl,proto3" json:"ttl,omitempty"` + Ttl *IntervalRule `protobuf:"bytes,4,opt,name=ttl,proto3" json:"ttl,omitempty"` } func (x *ResourceOpts) Reset() { *x = ResourceOpts{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_common_v1_common_proto_msgTypes[1] + mi := &file_banyandb_common_v1_common_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -201,7 +310,7 @@ func (x *ResourceOpts) String() string { func (*ResourceOpts) ProtoMessage() {} func (x *ResourceOpts) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_common_v1_common_proto_msgTypes[1] + mi := &file_banyandb_common_v1_common_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -214,7 +323,7 @@ func (x *ResourceOpts) ProtoReflect() protoreflect.Message { // Deprecated: Use ResourceOpts.ProtoReflect.Descriptor instead. func (*ResourceOpts) Descriptor() ([]byte, []int) { - return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{1} + return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{2} } func (x *ResourceOpts) GetShardNum() uint32 { @@ -224,18 +333,25 @@ func (x *ResourceOpts) GetShardNum() uint32 { return 0 } -func (x *ResourceOpts) GetBlockNum() uint32 { +func (x *ResourceOpts) GetBlockInterval() *IntervalRule { if x != nil { - return x.BlockNum + return x.BlockInterval } - return 0 + return nil +} + +func (x *ResourceOpts) GetSegmentInterval() *IntervalRule { + if x != nil { + return x.SegmentInterval + } + return nil } -func (x *ResourceOpts) GetTtl() string { +func (x *ResourceOpts) GetTtl() *IntervalRule { if x != nil { return x.Ttl } - return "" + return nil } // Group is an internal object for Group management @@ -257,7 +373,7 @@ type Group struct { func (x *Group) Reset() { *x = Group{} if protoimpl.UnsafeEnabled { - mi := &file_banyandb_common_v1_common_proto_msgTypes[2] + mi := &file_banyandb_common_v1_common_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -270,7 +386,7 @@ func (x *Group) String() string { func (*Group) ProtoMessage() {} func (x *Group) ProtoReflect() protoreflect.Message { - mi := &file_banyandb_common_v1_common_proto_msgTypes[2] + mi := &file_banyandb_common_v1_common_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -283,7 +399,7 @@ func (x *Group) ProtoReflect() protoreflect.Message { // Deprecated: Use Group.ProtoReflect.Descriptor instead. func (*Group) Descriptor() ([]byte, []int) { - return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{2} + return file_banyandb_common_v1_common_proto_rawDescGZIP(), []int{3} } func (x *Group) GetMetadata() *Metadata { @@ -333,41 +449,65 @@ var file_banyandb_common_v1_common_proto_rawDesc = []byte{ 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x6d, 0x6f, 0x64, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, - 0x6d, 0x6f, 0x64, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x5a, 0x0a, 0x0c, 0x52, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, - 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, - 0x73, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x62, 0x6c, 0x6f, - 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x22, 0xfa, 0x01, 0x0a, 0x05, 0x47, 0x72, 0x6f, 0x75, - 0x70, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x35, 0x0a, 0x07, 0x63, - 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x62, - 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, - 0x31, 0x2e, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x52, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, - 0x6f, 0x67, 0x12, 0x45, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6f, - 0x70, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, - 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x73, 0x52, 0x0c, 0x72, 0x65, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x64, 0x41, 0x74, 0x2a, 0x4b, 0x0a, 0x07, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x12, - 0x17, 0x0a, 0x13, 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, - 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x43, 0x41, 0x54, 0x41, - 0x4c, 0x4f, 0x47, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x10, 0x01, 0x12, 0x13, 0x0a, 0x0f, - 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x41, 0x53, 0x55, 0x52, 0x45, 0x10, - 0x02, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, - 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, - 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x5a, 0x42, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, - 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, - 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, - 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6d, 0x6f, 0x64, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0xa9, 0x01, 0x0a, 0x0c, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x43, 0x0a, 0x04, + 0x75, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x62, 0x61, 0x6e, + 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x52, 0x75, 0x6c, 0x65, 0x2e, 0x55, 0x6e, 0x69, + 0x74, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x82, 0x01, 0x02, 0x10, 0x01, 0x52, 0x04, 0x75, 0x6e, 0x69, + 0x74, 0x12, 0x19, 0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x42, 0x07, + 0xfa, 0x42, 0x04, 0x2a, 0x02, 0x20, 0x00, 0x52, 0x03, 0x6e, 0x75, 0x6d, 0x22, 0x39, 0x0a, 0x04, + 0x55, 0x6e, 0x69, 0x74, 0x12, 0x14, 0x0a, 0x10, 0x55, 0x4e, 0x49, 0x54, 0x5f, 0x55, 0x4e, 0x53, + 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, + 0x49, 0x54, 0x5f, 0x48, 0x4f, 0x55, 0x52, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x4e, 0x49, + 0x54, 0x5f, 0x44, 0x41, 0x59, 0x10, 0x02, 0x22, 0x9c, 0x02, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x24, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, + 0x64, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x42, 0x07, 0xfa, 0x42, 0x04, + 0x2a, 0x02, 0x20, 0x00, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x12, 0x51, + 0x0a, 0x0e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x76, 0x61, 0x6c, 0x52, 0x75, 0x6c, 0x65, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x8a, 0x01, 0x02, + 0x10, 0x01, 0x52, 0x0d, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, + 0x6c, 0x12, 0x55, 0x0a, 0x10, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, + 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, + 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x52, 0x75, 0x6c, 0x65, 0x42, 0x08, 0xfa, + 0x42, 0x05, 0x8a, 0x01, 0x02, 0x10, 0x01, 0x52, 0x0f, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, + 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x3c, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x76, 0x61, 0x6c, 0x52, 0x75, 0x6c, 0x65, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x8a, 0x01, 0x02, 0x10, + 0x01, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x22, 0x8e, 0x02, 0x0a, 0x05, 0x47, 0x72, 0x6f, 0x75, 0x70, + 0x12, 0x42, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x42, 0x08, 0xfa, 0x42, 0x05, 0x8a, 0x01, 0x02, 0x10, 0x01, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x12, 0x35, 0x0a, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x61, 0x74, 0x61, 0x6c, + 0x6f, 0x67, 0x52, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x12, 0x4f, 0x0a, 0x0d, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x4f, 0x70, 0x74, 0x73, 0x42, 0x08, 0xfa, 0x42, 0x05, 0x8a, 0x01, 0x02, 0x10, 0x01, 0x52, 0x0c, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x73, 0x12, 0x39, 0x0a, 0x0a, + 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x2a, 0x4b, 0x0a, 0x07, 0x43, 0x61, 0x74, 0x61, 0x6c, + 0x6f, 0x67, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x43, + 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x10, 0x01, 0x12, + 0x13, 0x0a, 0x0f, 0x43, 0x41, 0x54, 0x41, 0x4c, 0x4f, 0x47, 0x5f, 0x4d, 0x45, 0x41, 0x53, 0x55, + 0x52, 0x45, 0x10, 0x02, 0x42, 0x6e, 0x0a, 0x28, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, + 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, + 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, + 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, + 0x6e, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -382,25 +522,31 @@ func file_banyandb_common_v1_common_proto_rawDescGZIP() []byte { return file_banyandb_common_v1_common_proto_rawDescData } -var file_banyandb_common_v1_common_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_banyandb_common_v1_common_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_banyandb_common_v1_common_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_banyandb_common_v1_common_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_banyandb_common_v1_common_proto_goTypes = []interface{}{ (Catalog)(0), // 0: banyandb.common.v1.Catalog - (*Metadata)(nil), // 1: banyandb.common.v1.Metadata - (*ResourceOpts)(nil), // 2: banyandb.common.v1.ResourceOpts - (*Group)(nil), // 3: banyandb.common.v1.Group - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (IntervalRule_Unit)(0), // 1: banyandb.common.v1.IntervalRule.Unit + (*Metadata)(nil), // 2: banyandb.common.v1.Metadata + (*IntervalRule)(nil), // 3: banyandb.common.v1.IntervalRule + (*ResourceOpts)(nil), // 4: banyandb.common.v1.ResourceOpts + (*Group)(nil), // 5: banyandb.common.v1.Group + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp } var file_banyandb_common_v1_common_proto_depIdxs = []int32{ - 1, // 0: banyandb.common.v1.Group.metadata:type_name -> banyandb.common.v1.Metadata - 0, // 1: banyandb.common.v1.Group.catalog:type_name -> banyandb.common.v1.Catalog - 2, // 2: banyandb.common.v1.Group.resource_opts:type_name -> banyandb.common.v1.ResourceOpts - 4, // 3: banyandb.common.v1.Group.updated_at:type_name -> google.protobuf.Timestamp - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 1, // 0: banyandb.common.v1.IntervalRule.unit:type_name -> banyandb.common.v1.IntervalRule.Unit + 3, // 1: banyandb.common.v1.ResourceOpts.block_interval:type_name -> banyandb.common.v1.IntervalRule + 3, // 2: banyandb.common.v1.ResourceOpts.segment_interval:type_name -> banyandb.common.v1.IntervalRule + 3, // 3: banyandb.common.v1.ResourceOpts.ttl:type_name -> banyandb.common.v1.IntervalRule + 2, // 4: banyandb.common.v1.Group.metadata:type_name -> banyandb.common.v1.Metadata + 0, // 5: banyandb.common.v1.Group.catalog:type_name -> banyandb.common.v1.Catalog + 4, // 6: banyandb.common.v1.Group.resource_opts:type_name -> banyandb.common.v1.ResourceOpts + 6, // 7: banyandb.common.v1.Group.updated_at:type_name -> google.protobuf.Timestamp + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_banyandb_common_v1_common_proto_init() } @@ -422,7 +568,7 @@ func file_banyandb_common_v1_common_proto_init() { } } file_banyandb_common_v1_common_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ResourceOpts); i { + switch v := v.(*IntervalRule); i { case 0: return &v.state case 1: @@ -434,6 +580,18 @@ func file_banyandb_common_v1_common_proto_init() { } } file_banyandb_common_v1_common_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ResourceOpts); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_banyandb_common_v1_common_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Group); i { case 0: return &v.state @@ -451,8 +609,8 @@ func file_banyandb_common_v1_common_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_banyandb_common_v1_common_proto_rawDesc, - NumEnums: 1, - NumMessages: 3, + NumEnums: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/api/proto/banyandb/common/v1/common.pb.validate.go b/api/proto/banyandb/common/v1/common.pb.validate.go index e436e66..8081aeb 100644 --- a/api/proto/banyandb/common/v1/common.pb.validate.go +++ b/api/proto/banyandb/common/v1/common.pb.validate.go @@ -153,6 +153,127 @@ var _ interface { ErrorName() string } = MetadataValidationError{} +// Validate checks the field values on IntervalRule with the rules defined in +// the proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. +func (m *IntervalRule) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on IntervalRule with the rules defined +// in the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in IntervalRuleMultiError, or +// nil if none found. +func (m *IntervalRule) ValidateAll() error { + return m.validate(true) +} + +func (m *IntervalRule) validate(all bool) error { + if m == nil { + return nil + } + + var errors []error + + if _, ok := IntervalRule_Unit_name[int32(m.GetUnit())]; !ok { + err := IntervalRuleValidationError{ + field: "Unit", + reason: "value must be one of the defined enum values", + } + if !all { + return err + } + errors = append(errors, err) + } + + if m.GetNum() <= 0 { + err := IntervalRuleValidationError{ + field: "Num", + reason: "value must be greater than 0", + } + if !all { + return err + } + errors = append(errors, err) + } + + if len(errors) > 0 { + return IntervalRuleMultiError(errors) + } + + return nil +} + +// IntervalRuleMultiError is an error wrapping multiple validation errors +// returned by IntervalRule.ValidateAll() if the designated constraints aren't met. +type IntervalRuleMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m IntervalRuleMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m IntervalRuleMultiError) AllErrors() []error { return m } + +// IntervalRuleValidationError is the validation error returned by +// IntervalRule.Validate if the designated constraints aren't met. +type IntervalRuleValidationError struct { + field string + reason string + cause error + key bool +} + +// Field function returns field value. +func (e IntervalRuleValidationError) Field() string { return e.field } + +// Reason function returns reason value. +func (e IntervalRuleValidationError) Reason() string { return e.reason } + +// Cause function returns cause value. +func (e IntervalRuleValidationError) Cause() error { return e.cause } + +// Key function returns key value. +func (e IntervalRuleValidationError) Key() bool { return e.key } + +// ErrorName returns error name. +func (e IntervalRuleValidationError) ErrorName() string { return "IntervalRuleValidationError" } + +// Error satisfies the builtin error interface +func (e IntervalRuleValidationError) Error() string { + cause := "" + if e.cause != nil { + cause = fmt.Sprintf(" | caused by: %v", e.cause) + } + + key := "" + if e.key { + key = "key for " + } + + return fmt.Sprintf( + "invalid %sIntervalRule.%s: %s%s", + key, + e.field, + e.reason, + cause) +} + +var _ error = IntervalRuleValidationError{} + +var _ interface { + Field() string + Reason() string + Key() bool + Cause() error + ErrorName() string +} = IntervalRuleValidationError{} + // Validate checks the field values on ResourceOpts with the rules defined in // the proto definition for this message. If any rules are violated, the first // error encountered is returned, or nil if there are no violations. @@ -175,11 +296,136 @@ func (m *ResourceOpts) validate(all bool) error { var errors []error - // no validation rules for ShardNum + if m.GetShardNum() <= 0 { + err := ResourceOptsValidationError{ + field: "ShardNum", + reason: "value must be greater than 0", + } + if !all { + return err + } + errors = append(errors, err) + } - // no validation rules for BlockNum + if m.GetBlockInterval() == nil { + err := ResourceOptsValidationError{ + field: "BlockInterval", + reason: "value is required", + } + if !all { + return err + } + errors = append(errors, err) + } + + if all { + switch v := interface{}(m.GetBlockInterval()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "BlockInterval", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "BlockInterval", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetBlockInterval()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return ResourceOptsValidationError{ + field: "BlockInterval", + reason: "embedded message failed validation", + cause: err, + } + } + } - // no validation rules for Ttl + if m.GetSegmentInterval() == nil { + err := ResourceOptsValidationError{ + field: "SegmentInterval", + reason: "value is required", + } + if !all { + return err + } + errors = append(errors, err) + } + + if all { + switch v := interface{}(m.GetSegmentInterval()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "SegmentInterval", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "SegmentInterval", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetSegmentInterval()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return ResourceOptsValidationError{ + field: "SegmentInterval", + reason: "embedded message failed validation", + cause: err, + } + } + } + + if m.GetTtl() == nil { + err := ResourceOptsValidationError{ + field: "Ttl", + reason: "value is required", + } + if !all { + return err + } + errors = append(errors, err) + } + + if all { + switch v := interface{}(m.GetTtl()).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "Ttl", + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, ResourceOptsValidationError{ + field: "Ttl", + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(m.GetTtl()).(interface{ Validate() error }); ok { + if err := v.Validate(); err != nil { + return ResourceOptsValidationError{ + field: "Ttl", + reason: "embedded message failed validation", + cause: err, + } + } + } if len(errors) > 0 { return ResourceOptsMultiError(errors) @@ -279,6 +525,17 @@ func (m *Group) validate(all bool) error { var errors []error + if m.GetMetadata() == nil { + err := GroupValidationError{ + field: "Metadata", + reason: "value is required", + } + if !all { + return err + } + errors = append(errors, err) + } + if all { switch v := interface{}(m.GetMetadata()).(type) { case interface{ ValidateAll() error }: @@ -310,6 +567,17 @@ func (m *Group) validate(all bool) error { // no validation rules for Catalog + if m.GetResourceOpts() == nil { + err := GroupValidationError{ + field: "ResourceOpts", + reason: "value is required", + } + if !all { + return err + } + errors = append(errors, err) + } + if all { switch v := interface{}(m.GetResourceOpts()).(type) { case interface{ ValidateAll() error }: diff --git a/api/proto/banyandb/common/v1/common.proto b/api/proto/banyandb/common/v1/common.proto index b00d04c..424d511 100644 --- a/api/proto/banyandb/common/v1/common.proto +++ b/api/proto/banyandb/common/v1/common.proto @@ -44,23 +44,38 @@ message Metadata { int64 mod_revision = 5; } +// IntervalRule is a structured duration +message IntervalRule { + enum Unit { + UNIT_UNSPECIFIED = 0; + UNIT_HOUR = 1; + UNIT_DAY = 2; + } + // unit can only be UNIT_HOUR or UNIT_DAY + Unit unit = 1 [(validate.rules).enum.defined_only = true]; + uint32 num = 2 [(validate.rules).uint32.gt = 0]; +} + message ResourceOpts { // shard_num is the number of shards - uint32 shard_num = 1; - // block_num specific how many blocks in a segment - uint32 block_num = 2; + uint32 shard_num = 1 [(validate.rules).uint32.gt = 0]; + // block_interval indicates the length of a block + // block_interval should be less than or equal to segment_interval + IntervalRule block_interval = 2 [(validate.rules).message.required = true]; + // segment_interval indicates the length of a segment + IntervalRule segment_interval = 3 [(validate.rules).message.required = true];; // ttl indicates time to live, how long the data will be cached - string ttl = 3; + IntervalRule ttl = 4 [(validate.rules).message.required = true]; } // Group is an internal object for Group management message Group { // metadata define the group's identity - common.v1.Metadata metadata = 1; + common.v1.Metadata metadata = 1 [(validate.rules).message.required = true]; // catalog denotes which type of data the group contains common.v1.Catalog catalog = 2; // resourceOpts indicates the structure of the underlying kv storage - ResourceOpts resource_opts = 3; + ResourceOpts resource_opts = 3 [(validate.rules).message.required = true]; // updated_at indicates when resources of the group are updated google.protobuf.Timestamp updated_at = 4; } diff --git a/api/proto/openapi/banyandb/database/v1/rpc.swagger.json b/api/proto/openapi/banyandb/database/v1/rpc.swagger.json index a452822..b8a1f95 100644 --- a/api/proto/openapi/banyandb/database/v1/rpc.swagger.json +++ b/api/proto/openapi/banyandb/database/v1/rpc.swagger.json @@ -1377,6 +1377,15 @@ "default": "ANALYZER_UNSPECIFIED", "description": " - ANALYZER_KEYWORD: Keyword analyzer is a “noop” analyzer which returns the entire input string as a single token.\n - ANALYZER_STANDARD: Standard analyzer provides grammar based tokenization\n - ANALYZER_SIMPLE: Simple analyzer breaks text into tokens at any non-letter character, \nsuch as numbers, spaces, hyphens and apostrophes, discards non-letter characters, \nand changes uppercase to lowercase." }, + "IntervalRuleUnit": { + "type": "string", + "enum": [ + "UNIT_UNSPECIFIED", + "UNIT_HOUR", + "UNIT_DAY" + ], + "default": "UNIT_UNSPECIFIED" + }, "LogicalExpressionLogicalOp": { "type": "string", "enum": [ @@ -1780,6 +1789,20 @@ } } }, + "v1IntervalRule": { + "type": "object", + "properties": { + "unit": { + "$ref": "#/definitions/IntervalRuleUnit", + "title": "unit can only be UNIT_HOUR or UNIT_DAY" + }, + "num": { + "type": "integer", + "format": "int64" + } + }, + "title": "IntervalRule is a structured duration" + }, "v1LogicalExpression": { "type": "object", "properties": { @@ -1910,13 +1933,16 @@ "format": "int64", "title": "shard_num is the number of shards" }, - "blockNum": { - "type": "integer", - "format": "int64", - "title": "block_num specific how many blocks in a segment" + "blockInterval": { + "$ref": "#/definitions/v1IntervalRule", + "title": "block_interval indicates the length of a block\nblock_interval should be less than or equal to segment_interval" + }, + "segmentInterval": { + "$ref": "#/definitions/v1IntervalRule", + "title": "segment_interval indicates the length of a segment" }, "ttl": { - "type": "string", + "$ref": "#/definitions/v1IntervalRule", "title": "ttl indicates time to live, how long the data will be cached" } } diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 3f3c8da..fa4a545 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" + pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) @@ -207,6 +208,13 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { EncoderPool: newEncoderPool(name, plainChunkSize, intChunkSize, s.l), DecoderPool: newDecoderPool(name, plainChunkSize, intChunkSize, s.l), } + var err error + if opts.BlockInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.BlockInterval); err != nil { + return nil, err + } + if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil { + return nil, err + } return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "measure", diff --git a/banyand/metadata/schema/testdata/group.json b/banyand/metadata/schema/testdata/group.json index ec54e01..e200d01 100644 --- a/banyand/metadata/schema/testdata/group.json +++ b/banyand/metadata/schema/testdata/group.json @@ -5,7 +5,18 @@ "catalog": "CATALOG_STREAM", "resource_opts": { "shard_num": 2, - "ttl": "7d" + "block_interval": { + "unit": "UNIT_HOUR", + "num": 2 + }, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index fce3d91..67f1f82 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -32,6 +32,8 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" + + pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) @@ -209,6 +211,13 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { EncoderPool: encoding.NewPlainEncoderPool(name, chunkSize), DecoderPool: encoding.NewPlainDecoderPool(name, chunkSize), } + var err error + if opts.BlockInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.BlockInterval); err != nil { + return nil, err + } + if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil { + return nil, err + } return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "stream", diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 2f77874..3828e86 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -87,8 +87,8 @@ type DatabaseOpts struct { Location string ShardNum uint32 EncodingMethod EncodingMethod - SegmentSize IntervalRule - BlockSize IntervalRule + SegmentInterval IntervalRule + BlockInterval IntervalRule BlockMemSize int64 SeriesMemSize int64 EnableGlobalIndex bool @@ -160,19 +160,13 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { if _, err := mkdir(opts.Location); err != nil { return nil, err } - segmentSize := opts.SegmentSize + segmentSize := opts.SegmentInterval if segmentSize.Num == 0 { - segmentSize = IntervalRule{ - Unit: DAY, - Num: 1, - } + return nil, errors.Wrap(ErrOpenDatabase, "segment interval is absent") } - blockSize := opts.BlockSize + blockSize := opts.BlockInterval if blockSize.Num == 0 { - blockSize = IntervalRule{ - Unit: HOUR, - Num: 2, - } + return nil, errors.Wrap(ErrOpenDatabase, "block interval is absent") } if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() { return nil, errors.Wrapf(ErrOpenDatabase, "the block size is bigger than the segment size") diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index 28cfeaa..b3f9ede 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -79,6 +79,8 @@ func openDatabase(t *require.Assertions, path string) (db Database) { EncoderPool: encoding.NewPlainEncoderPool("tsdb", 0), DecoderPool: encoding.NewPlainDecoderPool("tsdb", 0), }, + BlockInterval: IntervalRule{Num: 2}, + SegmentInterval: IntervalRule{Num: 1, Unit: DAY}, }) t.NoError(err) t.NotNil(db) diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go index b47d8a3..7938a38 100644 --- a/bydbctl/internal/cmd/group.go +++ b/bydbctl/internal/cmd/group.go @@ -24,6 +24,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/version" "github.com/go-resty/resty/v2" "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" database_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -44,7 +45,7 @@ func newGroupCmd() *cobra.Command { return rest(func() ([]reqBody, error) { return parseNameFromYAML(cmd.InOrStdin()) }, func(request request) (*resty.Response, error) { g := new(common_v1.Group) - err := json.Unmarshal(request.data, g) + err := protojson.Unmarshal(request.data, g) if err != nil { return nil, err } diff --git a/bydbctl/internal/cmd/stream_test.go b/bydbctl/internal/cmd/stream_test.go index 046abaa..479c136 100644 --- a/bydbctl/internal/cmd/stream_test.go +++ b/bydbctl/internal/cmd/stream_test.go @@ -66,7 +66,19 @@ var _ = Describe("Stream", func() { rootCmd.SetArgs([]string{"group", "create", "-f", "-"}) rootCmd.SetIn(strings.NewReader(` metadata: - name: group1`)) + name: group1 +catalog: CATALOG_STREAM +resource_opts: + shard_num: 2 + block_interval: + unit: UNIT_HOUR + num: 2 + segment_interval: + unit: UNIT_DAY + num: 1 + ttl: + unit: UNIT_DAY + num: 7`)) out := capturer.CaptureStdout(func() { err := rootCmd.Execute() Expect(err).NotTo(HaveOccurred()) diff --git a/docs/api-reference.md b/docs/api-reference.md index d00648c..10843bb 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -5,10 +5,12 @@ - [banyandb/common/v1/common.proto](#banyandb_common_v1_common-proto) - [Group](#banyandb-common-v1-Group) + - [IntervalRule](#banyandb-common-v1-IntervalRule) - [Metadata](#banyandb-common-v1-Metadata) - [ResourceOpts](#banyandb-common-v1-ResourceOpts) - [Catalog](#banyandb-common-v1-Catalog) + - [IntervalRule.Unit](#banyandb-common-v1-IntervalRule-Unit) - [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) - [Node](#banyandb-database-v1-Node) @@ -223,6 +225,22 @@ Group is an internal object for Group management +<a name="banyandb-common-v1-IntervalRule"></a> + +### IntervalRule +IntervalRule is a structured duration + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| unit | [IntervalRule.Unit](#banyandb-common-v1-IntervalRule-Unit) | | unit can only be UNIT_HOUR or UNIT_DAY | +| num | [uint32](#uint32) | | | + + + + + + <a name="banyandb-common-v1-Metadata"></a> ### Metadata @@ -251,8 +269,9 @@ Metadata is for multi-tenant, multi-model use | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | | shard_num | [uint32](#uint32) | | shard_num is the number of shards | -| block_num | [uint32](#uint32) | | block_num specific how many blocks in a segment | -| ttl | [string](#string) | | ttl indicates time to live, how long the data will be cached | +| block_interval | [IntervalRule](#banyandb-common-v1-IntervalRule) | | block_interval indicates the length of a block block_interval should be less than or equal to segment_interval | +| segment_interval | [IntervalRule](#banyandb-common-v1-IntervalRule) | | segment_interval indicates the length of a segment | +| ttl | [IntervalRule](#banyandb-common-v1-IntervalRule) | | ttl indicates time to live, how long the data will be cached | @@ -273,6 +292,19 @@ Metadata is for multi-tenant, multi-model use | CATALOG_MEASURE | 2 | | + +<a name="banyandb-common-v1-IntervalRule-Unit"></a> + +### IntervalRule.Unit + + +| Name | Number | Description | +| ---- | ------ | ----------- | +| UNIT_UNSPECIFIED | 0 | | +| UNIT_HOUR | 1 | | +| UNIT_DAY | 2 | | + + diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go index 13b3202..27c6773 100644 --- a/pkg/pb/v1/metadata.go +++ b/pkg/pb/v1/metadata.go @@ -18,11 +18,17 @@ package v1 import ( - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "errors" + + common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + database_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/tsdb" ) -func FindTagByName(families []*databasev1.TagFamilySpec, tagName string) (int, int, *databasev1.TagSpec) { +var ErrInvalidUnit = errors.New("Invalid interval rule's unit") + +func FindTagByName(families []*database_v1.TagFamilySpec, tagName string) (int, int, *database_v1.TagSpec) { for fi, family := range families { for ti, tag := range family.Tags { if tagName == tag.GetName() { @@ -33,41 +39,41 @@ func FindTagByName(families []*databasev1.TagFamilySpec, tagName string) (int, i return 0, 0, nil } -func TagValueTypeConv(tagValue *modelv1.TagValue) (tagType databasev1.TagType, isNull bool) { +func TagValueTypeConv(tagValue *model_v1.TagValue) (tagType database_v1.TagType, isNull bool) { switch tagValue.GetValue().(type) { - case *modelv1.TagValue_Int: - return databasev1.TagType_TAG_TYPE_INT, false - case *modelv1.TagValue_Str: - return databasev1.TagType_TAG_TYPE_STRING, false - case *modelv1.TagValue_IntArray: - return databasev1.TagType_TAG_TYPE_INT_ARRAY, false - case *modelv1.TagValue_StrArray: - return databasev1.TagType_TAG_TYPE_STRING_ARRAY, false - case *modelv1.TagValue_BinaryData: - return databasev1.TagType_TAG_TYPE_DATA_BINARY, false - case *modelv1.TagValue_Id: - return databasev1.TagType_TAG_TYPE_ID, false - case *modelv1.TagValue_Null: - return databasev1.TagType_TAG_TYPE_UNSPECIFIED, true + case *model_v1.TagValue_Int: + return database_v1.TagType_TAG_TYPE_INT, false + case *model_v1.TagValue_Str: + return database_v1.TagType_TAG_TYPE_STRING, false + case *model_v1.TagValue_IntArray: + return database_v1.TagType_TAG_TYPE_INT_ARRAY, false + case *model_v1.TagValue_StrArray: + return database_v1.TagType_TAG_TYPE_STRING_ARRAY, false + case *model_v1.TagValue_BinaryData: + return database_v1.TagType_TAG_TYPE_DATA_BINARY, false + case *model_v1.TagValue_Id: + return database_v1.TagType_TAG_TYPE_ID, false + case *model_v1.TagValue_Null: + return database_v1.TagType_TAG_TYPE_UNSPECIFIED, true } - return databasev1.TagType_TAG_TYPE_UNSPECIFIED, false + return database_v1.TagType_TAG_TYPE_UNSPECIFIED, false } -func FieldValueTypeConv(tagValue *modelv1.FieldValue) (tagType databasev1.FieldType, isNull bool) { +func FieldValueTypeConv(tagValue *model_v1.FieldValue) (tagType database_v1.FieldType, isNull bool) { switch tagValue.GetValue().(type) { - case *modelv1.FieldValue_Int: - return databasev1.FieldType_FIELD_TYPE_INT, false - case *modelv1.FieldValue_Str: - return databasev1.FieldType_FIELD_TYPE_STRING, false - case *modelv1.FieldValue_BinaryData: - return databasev1.FieldType_FIELD_TYPE_DATA_BINARY, false - case *modelv1.FieldValue_Null: - return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, true + case *model_v1.FieldValue_Int: + return database_v1.FieldType_FIELD_TYPE_INT, false + case *model_v1.FieldValue_Str: + return database_v1.FieldType_FIELD_TYPE_STRING, false + case *model_v1.FieldValue_BinaryData: + return database_v1.FieldType_FIELD_TYPE_DATA_BINARY, false + case *model_v1.FieldValue_Null: + return database_v1.FieldType_FIELD_TYPE_UNSPECIFIED, true } - return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false + return database_v1.FieldType_FIELD_TYPE_UNSPECIFIED, false } -func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) { +func ParseMaxModRevision(indexRules []*database_v1.IndexRule) (maxRevisionForIdxRules int64) { maxRevisionForIdxRules = int64(0) for _, idxRule := range indexRules { if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules { @@ -76,3 +82,16 @@ func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxR } return } + +func ToIntervalRule(ir *common_v1.IntervalRule) (result tsdb.IntervalRule, err error) { + switch ir.Unit { + case common_v1.IntervalRule_UNIT_DAY: + result.Unit = tsdb.DAY + case common_v1.IntervalRule_UNIT_HOUR: + result.Unit = tsdb.HOUR + default: + return result, ErrInvalidUnit + } + result.Num = int(ir.Num) + return result, err +} diff --git a/pkg/test/measure/testdata/groups/sw_metric.json b/pkg/test/measure/testdata/groups/sw_metric.json index 186830c..4b7efc4 100644 --- a/pkg/test/measure/testdata/groups/sw_metric.json +++ b/pkg/test/measure/testdata/groups/sw_metric.json @@ -5,8 +5,18 @@ "catalog": "CATALOG_MEASURE", "resource_opts": { "shard_num": 2, - "block_num": 12, - "ttl": "7d" + "block_interval": { + "unit": "UNIT_HOUR", + "num": 2 + }, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file diff --git a/pkg/test/stream/testdata/group.json b/pkg/test/stream/testdata/group.json index ec54e01..e200d01 100644 --- a/pkg/test/stream/testdata/group.json +++ b/pkg/test/stream/testdata/group.json @@ -5,7 +5,18 @@ "catalog": "CATALOG_STREAM", "resource_opts": { "shard_num": 2, - "ttl": "7d" + "block_interval": { + "unit": "UNIT_HOUR", + "num": 2 + }, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + } }, "updated_at": "2021-04-15T01:30:15.01Z" } \ No newline at end of file
