This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch optimize-memory in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 42b52155cf05bc784bbe12e0c0390376b2e3cf6d Author: Gao Hongtao <[email protected]> AuthorDate: Mon Aug 8 15:23:15 2022 +0000 Parameterize memory size Signed-off-by: Gao Hongtao <[email protected]> --- api/proto/banyandb/measure/v1/topn.pb.go | 2 +- api/proto/banyandb/measure/v1/write.pb.go | 2 +- api/proto/banyandb/model/v1/common.pb.go | 2 - banyand/kv/kv.go | 29 +++++ banyand/measure/metadata.go | 26 +++-- banyand/measure/service.go | 9 +- banyand/stream/metadata.go | 26 +++-- banyand/stream/service.go | 13 ++- banyand/stream/stream.go | 9 +- banyand/tsdb/block.go | 183 +++++++++++++++++++----------- banyand/tsdb/bucket/strategy.go | 70 +++++------- banyand/tsdb/index/writer.go | 38 ++++--- banyand/tsdb/metric.go | 12 +- banyand/tsdb/segment.go | 78 +++++++------ banyand/tsdb/series.go | 10 +- banyand/tsdb/seriesdb.go | 43 ++++--- banyand/tsdb/shard.go | 4 +- banyand/tsdb/shard_test.go | 26 ++--- banyand/tsdb/tsdb.go | 20 ++-- banyand/tsdb/tsdb_suite_test.go | 2 +- dist/LICENSE | 4 +- go.mod | 30 +++-- go.sum | 9 +- pkg/index/index.go | 22 ++-- pkg/index/lsm/lsm.go | 11 +- pkg/run/channel.go | 50 -------- pkg/test/helpers/fail_interceptor.go | 5 +- 27 files changed, 401 insertions(+), 334 deletions(-) diff --git a/api/proto/banyandb/measure/v1/topn.pb.go b/api/proto/banyandb/measure/v1/topn.pb.go index 2f3cc61..92eb78e 100644 --- a/api/proto/banyandb/measure/v1/topn.pb.go +++ b/api/proto/banyandb/measure/v1/topn.pb.go @@ -41,7 +41,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// TopNList contains a series of topN items +//TopNList contains a series of topN items type TopNList struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/api/proto/banyandb/measure/v1/write.pb.go b/api/proto/banyandb/measure/v1/write.pb.go index ac30c4c..e693cbb 100644 --- a/api/proto/banyandb/measure/v1/write.pb.go +++ b/api/proto/banyandb/measure/v1/write.pb.go @@ -41,7 +41,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// DataPointValue is the data point for writing. It only contains values. +//DataPointValue is the data point for writing. It only contains values. type DataPointValue struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/api/proto/banyandb/model/v1/common.pb.go b/api/proto/banyandb/model/v1/common.pb.go index cd5e94a..b88a476 100644 --- a/api/proto/banyandb/model/v1/common.pb.go +++ b/api/proto/banyandb/model/v1/common.pb.go @@ -337,7 +337,6 @@ type TagValue struct { unknownFields protoimpl.UnknownFields // Types that are assignable to Value: - // // *TagValue_Null // *TagValue_Str // *TagValue_StrArray @@ -535,7 +534,6 @@ type FieldValue struct { unknownFields protoimpl.UnknownFields // Types that are assignable to Value: - // // *FieldValue_Null // *FieldValue_Str // *FieldValue_Int diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 9ae630f..f40d05d 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -126,6 +126,17 @@ func TSSWithFlushCallback(callback func()) TimeSeriesOptions { } } +func TSSWithMemTableSize(size int64) TimeSeriesOptions { + return func(store TimeSeriesStore) { + if size < 1 { + return + } + if btss, ok := store.(*badgerTSS); ok { + btss.dbOpts.MemTableSize = size + } + } +} + type Iterator interface { Next() Rewind() @@ -160,6 +171,9 @@ func OpenTimeSeriesStore(shardID int, path string, options ...TimeSeriesOptions) } // Put all values into LSM btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0) + if btss.dbOpts.MemTableSize < 8<<20 { + btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10) + } var err error btss.db, err = badger.Open(btss.dbOpts) if err != nil { @@ -187,6 +201,18 @@ func StoreWithNamedLogger(name string, l *logger.Logger) StoreOptions { } } +// StoreWithMemTableSize sets MemTable size +func StoreWithMemTableSize(size int64) StoreOptions { + return func(store Store) { + if size < 1 { + return + } + if bdb, ok := store.(*badgerDB); ok { + bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size) + } + } +} + // OpenStore creates a new Store func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) { bdb := new(badgerDB) @@ -196,6 +222,9 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) opt(bdb) } bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32) + if bdb.dbOpts.MemTableSize > 0 && bdb.dbOpts.MemTableSize < 8<<20 { + bdb.dbOpts = bdb.dbOpts.WithValueThreshold(1 << 10) + } var err error bdb.db, err = badger.Open(bdb.dbOpts) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 2fad3d5..ef235a3 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -39,7 +39,9 @@ type schemaRepo struct { metadata metadata.Repo } -func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo { +func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, + dbOpts tsdb.DatabaseOpts, l *logger.Logger, +) schemaRepo { return schemaRepo{ l: l, metadata: metadata, @@ -47,7 +49,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe metadata, repo, l, - newSupplier(path, metadata, l), + newSupplier(path, metadata, dbOpts, l), event.MeasureTopicShardEvent, event.MeasureTopicEntityEvent, ), @@ -168,13 +170,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { path string + dbOpts tsdb.DatabaseOpts metadata metadata.Repo l *logger.Logger } -func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier { +func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier { + dbOpts.EncodingMethod = tsdb.EncodingMethod{ + EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, l), + DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, l), + } return &supplier{ path: path, + dbOpts: dbOpts, metadata: metadata, l: l, } @@ -195,17 +203,13 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re } func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { + opts := s.dbOpts + opts.ShardNum = groupSchema.ResourceOpts.ShardNum + opts.Location = path.Join(s.path, groupSchema.Metadata.Name) return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "measure", Database: groupSchema.Metadata.Name, }), - tsdb.DatabaseOpts{ - Location: path.Join(s.path, groupSchema.Metadata.Name), - ShardNum: groupSchema.ResourceOpts.ShardNum, - EncodingMethod: tsdb.EncodingMethod{ - EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, s.l), - DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, s.l), - }, - }) + opts) } diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 18239a2..26219d2 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" @@ -50,11 +51,13 @@ type Service interface { var _ Service = (*service)(nil) type service struct { + root string + dbOpts tsdb.DatabaseOpts + schemaRepo schemaRepo writeListener *writeCallback l *logger.Logger metadata metadata.Repo - root string pipeline queue.Queue repo discovery.ServiceRepo // stop channel for the service @@ -76,6 +79,8 @@ func (s *service) LoadGroup(name string) (resourceSchema.Group, bool) { func (s *service) FlagSet() *run.FlagSet { flagS := run.NewFlagSet("storage") flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of database") + flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size", 16<<20, "block memory size") + flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 1<<20, "series metadata memory size") return flagS } @@ -98,7 +103,7 @@ func (s *service) PreRun() error { if err != nil { return err } - s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l) + s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l) for _, g := range groups { if g.Catalog != commonv1.Catalog_CATALOG_MEASURE { continue diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 0895b16..172ed7a 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -41,7 +41,9 @@ type schemaRepo struct { metadata metadata.Repo } -func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, l *logger.Logger) schemaRepo { +func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo, + dbOpts tsdb.DatabaseOpts, l *logger.Logger, +) schemaRepo { return schemaRepo{ l: l, metadata: metadata, @@ -49,7 +51,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe metadata, repo, l, - newSupplier(path, metadata, l), + newSupplier(path, metadata, dbOpts, l), event.StreamTopicShardEvent, event.StreamTopicEntityEvent, ), @@ -170,13 +172,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil) type supplier struct { path string + dbOpts tsdb.DatabaseOpts metadata metadata.Repo l *logger.Logger } -func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) *supplier { +func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier { + dbOpts.EncodingMethod = tsdb.EncodingMethod{ + EncoderPool: encoding.NewPlainEncoderPool(chunkSize), + DecoderPool: encoding.NewPlainDecoderPool(chunkSize), + } return &supplier{ path: path, + dbOpts: dbOpts, metadata: metadata, l: l, } @@ -197,17 +205,13 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re } func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { + opts := s.dbOpts + opts.ShardNum = groupSchema.ResourceOpts.ShardNum + opts.Location = path.Join(s.path, groupSchema.Metadata.Name) return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "stream", Database: groupSchema.Metadata.Name, }), - tsdb.DatabaseOpts{ - Location: path.Join(s.path, groupSchema.Metadata.Name), - ShardNum: groupSchema.ResourceOpts.ShardNum, - EncodingMethod: tsdb.EncodingMethod{ - EncoderPool: encoding.NewPlainEncoderPool(chunkSize), - DecoderPool: encoding.NewPlainDecoderPool(chunkSize), - }, - }) + opts) } diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 1109ad4..2a7f555 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -49,11 +50,13 @@ type Service interface { var _ Service = (*service)(nil) type service struct { + root string + dbOpts tsdb.DatabaseOpts + schemaRepo schemaRepo writeListener *writeCallback l *logger.Logger metadata metadata.Repo - root string pipeline queue.Queue repo discovery.ServiceRepo // stop channel for the service @@ -71,6 +74,9 @@ func (s *service) Stream(metadata *commonv1.Metadata) (Stream, error) { func (s *service) FlagSet() *run.FlagSet { flagS := run.NewFlagSet("storage") flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of database") + flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20, "block memory size") + flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size", 1<<20, "series metadata memory size") + flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize, "stream-global-index-mem-size", 2<<20, "global index memory size") return flagS } @@ -93,7 +99,7 @@ func (s *service) PreRun() error { if err != nil { return err } - s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.l) + s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l) for _, g := range groups { if g.Catalog != commonv1.Catalog_CATALOG_STREAM { continue @@ -149,5 +155,8 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic metadata: metadata, repo: repo, pipeline: pipeline, + dbOpts: tsdb.DatabaseOpts{ + EnableGlobalIndex: true, + }, }, nil } diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 83854ac..1af3871 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -93,10 +93,11 @@ func openStream(shardNum uint32, db tsdb.Supplier, spec streamSpec, l *logger.Lo sm.db = db sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{ - DB: db, - ShardNum: shardNum, - Families: spec.schema.TagFamilies, - IndexRules: spec.indexRules, + DB: db, + ShardNum: shardNum, + Families: spec.schema.TagFamilies, + IndexRules: spec.indexRules, + EnableGlobalIndex: true, }) return sm, nil } diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 3419a34..21290f6 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -21,13 +21,12 @@ import ( "context" "io" "path" + "runtime" "strconv" "sync" + "sync/atomic" "time" - "github.com/dgraph-io/ristretto/z" - "go.uber.org/atomic" - "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -37,7 +36,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/lsm" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -48,13 +46,16 @@ const ( ) type block struct { - path string - l *logger.Logger - suffix string - ref *z.Closer - lock sync.RWMutex - closed *atomic.Bool - position common.Position + path string + l *logger.Logger + queue bucket.Queue + suffix string + ref *atomic.Int32 + closed *atomic.Bool + lock sync.RWMutex + position common.Position + memSize int64 + lsmMemSize int64 store kv.TimeSeriesStore invertedIndex index.Store @@ -65,8 +66,7 @@ type block struct { segID uint16 blockID uint16 encodingMethod EncodingMethod - flushCh *run.Chan[struct{}] - flushChQueue chan *run.Chan[struct{}] + flushCh chan struct{} } type blockOpts struct { @@ -75,6 +75,7 @@ type blockOpts struct { startTime time.Time suffix string path string + queue bucket.Queue } func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { @@ -84,60 +85,66 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { } id := GenerateInternalID(opts.blockSize.Unit, suffixInteger) timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false) - encodingMethodObject := ctx.Value(encodingMethodKey) - if encodingMethodObject == nil { - encodingMethodObject = EncodingMethod{ - EncoderPool: encoding.NewPlainEncoderPool(0), - DecoderPool: encoding.NewPlainDecoderPool(0), - } - } clock, _ := timestamp.GetClock(ctx) b = &block{ - segID: opts.segID, - blockID: id, - path: opts.path, - l: logger.Fetch(ctx, "block"), - TimeRange: timeRange, - Reporter: bucket.NewTimeBasedReporter(timeRange, clock), - closed: atomic.NewBool(true), - encodingMethod: encodingMethodObject.(EncodingMethod), - flushChQueue: make(chan *run.Chan[struct{}]), + segID: opts.segID, + blockID: id, + path: opts.path, + l: logger.Fetch(ctx, "block"), + TimeRange: timeRange, + Reporter: bucket.NewTimeBasedReporter(timeRange, clock), + flushCh: make(chan struct{}), + ref: &atomic.Int32{}, + closed: &atomic.Bool{}, + queue: opts.queue, } + b.options(ctx) position := ctx.Value(common.PositionKey) if position != nil { b.position = position.(common.Position) } go func() { - for { - ch := <-b.flushChQueue - for { - _, more := ch.Read() - if !more { - break - } - b.flush() - } + for range b.flushCh { + b.flush() } }() - return b, err + return b, b.open() } -func (b *block) open() (err error) { - b.lock.Lock() - defer b.lock.Unlock() - if !b.closed.Load() { - return nil +func (b *block) options(ctx context.Context) { + var options DatabaseOpts + o := ctx.Value(optionsKey) + if o != nil { + options = o.(DatabaseOpts) + } + if options.EncodingMethod.EncoderPool == nil { + options.EncodingMethod.EncoderPool = encoding.NewPlainEncoderPool(0) + } + if options.EncodingMethod.EncoderPool == nil { + options.EncodingMethod.DecoderPool = encoding.NewPlainDecoderPool(0) + } + b.encodingMethod = options.EncodingMethod + if options.BlockMemSize < 1 { + b.memSize = 8 << 20 // 8MB + } else { + b.memSize = options.BlockMemSize } - b.ref = z.NewCloser(1) - b.flushCh = run.NewChan(make(chan struct{})) - b.flushChQueue <- b.flushCh + b.lsmMemSize = b.memSize / 8 + defaultLSMMemSize := int64(1 << 20) + if b.lsmMemSize < defaultLSMMemSize { + b.lsmMemSize = defaultLSMMemSize + } +} + +func (b *block) open() (err error) { if b.store, err = kv.OpenTimeSeriesStore( 0, path.Join(b.path, componentMain), kv.TSSWithEncoding(b.encodingMethod.EncoderPool, b.encodingMethod.DecoderPool), kv.TSSWithLogger(b.l.Named(componentMain)), + kv.TSSWithMemTableSize(b.memSize), kv.TSSWithFlushCallback(func() { - b.flushCh.Write(struct{}{}) + b.flushCh <- struct{}{} }), ); err != nil { return err @@ -150,33 +157,76 @@ func (b *block) open() (err error) { return err } if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{ - Path: path.Join(b.path, componentSecondLSMIdx), - Logger: b.l.Named(componentSecondLSMIdx), + Path: path.Join(b.path, componentSecondLSMIdx), + Logger: b.l.Named(componentSecondLSMIdx), + MemTableSize: b.lsmMemSize, }); err != nil { return err } b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex) + b.ref.Store(0) b.closed.Store(false) - return nil } -func (b *block) delegate() blockDelegate { - if b.isClosed() { - return nil +func (b *block) delegate() (blockDelegate, error) { + if b.incRef() { + return &bDelegate{ + delegate: b, + }, nil + } + b.lock.Lock() + defer b.lock.Unlock() + b.queue.Push(BlockID{ + BlockID: b.blockID, + SegID: b.segID, + }) + // TODO: remove the block which fails to open from the queue + err := b.open() + if err != nil { + b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block") + return nil, err } b.incRef() return &bDelegate{ delegate: b, + }, nil +} + +func (b *block) incRef() bool { +loop: + if b.Closed() { + return false + } + r := b.ref.Load() + if b.ref.CompareAndSwap(r, r+1) { + return true } + runtime.Gosched() + goto loop } -func (b *block) dscRef() { - b.ref.Done() +func (b *block) Done() { +loop: + r := b.ref.Load() + if r < 1 { + return + } + if b.ref.CompareAndSwap(r, r-1) { + return + } + runtime.Gosched() + goto loop } -func (b *block) incRef() { - b.ref.AddRunning(1) +func (b *block) waitDone() { +loop: + if b.ref.Load() < 1 { + b.ref.Store(0) + return + } + runtime.Gosched() + goto loop } func (b *block) flush() { @@ -193,19 +243,14 @@ func (b *block) flush() { func (b *block) close() { b.lock.Lock() defer b.lock.Unlock() - if b.isClosed() { - return - } - b.dscRef() - b.ref.SignalAndWait() + b.closed.Store(true) + b.waitDone() for _, closer := range b.closableLst { _ = closer.Close() } - b.closed.Store(true) - b.flushCh.Close() } -func (b *block) isClosed() bool { +func (b *block) Closed() bool { return b.closed.Load() } @@ -215,6 +260,10 @@ func (b *block) String() string { func (b *block) stats() (names []string, stats []observability.Statistics) { names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx) + if b.Closed() { + stats = make([]observability.Statistics, 3) + return + } stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats()) return names, stats } @@ -290,6 +339,6 @@ func (d *bDelegate) String() string { } func (d *bDelegate) Close() error { - d.delegate.dscRef() + d.delegate.Done() return nil } diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go index 3cd6531..35c1a5e 100644 --- a/banyand/tsdb/bucket/strategy.go +++ b/banyand/tsdb/bucket/strategy.go @@ -18,8 +18,6 @@ package bucket import ( - "sync" - "github.com/pkg/errors" "go.uber.org/multierr" @@ -39,9 +37,7 @@ type Strategy struct { ctrl Controller current Reporter next Reporter - mux sync.Mutex logger *logger.Logger - stopCh chan struct{} } type StrategyOptions func(*Strategy) @@ -68,9 +64,8 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) return nil, errors.Wrap(ErrInvalidParameter, "controller is absent") } strategy := &Strategy{ - ctrl: ctrl, - ratio: 0.8, - stopCh: make(chan struct{}), + ctrl: ctrl, + ratio: 0.8, } for _, opt := range options { opt(strategy) @@ -93,44 +88,41 @@ func (s *Strategy) Run() { } reset() go func(s *Strategy) { - var err error - bucket: - c := s.current.Report() for { - select { - case status, closed := <-c: - if !closed { - reset() - goto bucket - } - ratio := Ratio(status.Volume) / Ratio(status.Capacity) - if ratio >= s.ratio && s.next == nil { - s.next, err = s.ctrl.Next() - if errors.Is(err, ErrNoMoreBucket) { - return - } - if err != nil { - s.logger.Err(err).Msg("failed to create the next bucket") - } - } - if ratio >= 1.0 { - s.mux.Lock() - s.ctrl.OnMove(s.current, s.next) - s.current = s.next - s.next = nil - s.mux.Unlock() - goto bucket - } - case <-s.stopCh: + if s.current == nil { return } + c := s.current.Report() + s.observe(c) } }(s) } +func (s *Strategy) observe(c Channel) { + var err error + moreBucket := true + for status := range c { + ratio := Ratio(status.Volume) / Ratio(status.Capacity) + if ratio >= s.ratio && s.next == nil && moreBucket { + s.next, err = s.ctrl.Next() + if errors.Is(err, ErrNoMoreBucket) { + moreBucket = false + } else if err != nil { + s.logger.Err(err).Msg("failed to create the next bucket") + } + } + if ratio >= 1.0 { + s.move() + return + } + } +} + +func (s *Strategy) move() { + s.ctrl.OnMove(s.current, s.next) + s.current = s.next + s.next = nil +} + func (s *Strategy) Close() { - close(s.stopCh) - s.mux.Lock() - defer s.mux.Unlock() - s.ctrl.OnMove(s.current, nil) } diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go index 3f94c25..8b0dcc9 100644 --- a/banyand/tsdb/index/writer.go +++ b/banyand/tsdb/index/writer.go @@ -33,7 +33,6 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/partition" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" - "github.com/apache/skywalking-banyandb/pkg/run" ) type CallbackFn func() @@ -52,18 +51,20 @@ type Value struct { } type WriterOptions struct { - ShardNum uint32 - Families []*databasev1.TagFamilySpec - IndexRules []*databasev1.IndexRule - DB tsdb.Supplier + ShardNum uint32 + Families []*databasev1.TagFamilySpec + IndexRules []*databasev1.IndexRule + DB tsdb.Supplier + EnableGlobalIndex bool } type Writer struct { - l *logger.Logger - db tsdb.Supplier - shardNum uint32 - ch *run.Chan[Message] - indexRuleIndex []*partition.IndexRuleLocator + l *logger.Logger + db tsdb.Supplier + shardNum uint32 + enableGlobalIndex bool + ch chan Message + indexRuleIndex []*partition.IndexRuleLocator } func NewWriter(ctx context.Context, options WriterOptions) *Writer { @@ -76,29 +77,26 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer { } w.shardNum = options.ShardNum w.db = options.DB + w.enableGlobalIndex = options.EnableGlobalIndex w.indexRuleIndex = partition.ParseIndexRuleLocators(options.Families, options.IndexRules) - w.ch = run.NewChan[Message](make(chan Message)) + w.ch = make(chan Message) w.bootIndexGenerator() return w } func (s *Writer) Write(value Message) { go func(m Message) { - s.ch.Write(m) + s.ch <- m }(value) } func (s *Writer) Close() error { - return s.ch.Close() + return nil } func (s *Writer) bootIndexGenerator() { go func() { - for { - m, more := s.ch.Read() - if !more { - return - } + for m := range s.ch { var err error for _, ruleIndex := range s.indexRuleIndex { rule := ruleIndex.Rule @@ -106,6 +104,10 @@ func (s *Writer) bootIndexGenerator() { case databasev1.IndexRule_LOCATION_SERIES: err = multierr.Append(err, writeLocalIndex(m.LocalWriter, ruleIndex, m.Value)) case databasev1.IndexRule_LOCATION_GLOBAL: + if !s.enableGlobalIndex { + s.l.Warn().Stringer("index-rule", ruleIndex.Rule).Msg("global index is disabled") + continue + } err = multierr.Append(err, s.writeGlobalIndex(m.Scope, ruleIndex, m.LocalWriter.ItemID(), m.Value)) } } diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go index a45f57d..5cdea5b 100644 --- a/banyand/tsdb/metric.go +++ b/banyand/tsdb/metric.go @@ -66,11 +66,11 @@ func (s *shard) runStat() { } func (s *shard) stat() { - defer func() { - if r := recover(); r != nil { - s.l.Warn().Interface("r", r).Msg("recovered") - } - }() + // defer func() { + // if r := recover(); r != nil { + // s.l.Warn().Interface("r", r).Msg("recovered") + // } + // }() seriesStat := s.seriesDatabase.Stats() s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes)) s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes)) @@ -81,7 +81,7 @@ func (s *shard) stat() { segStats.MaxMemBytes += segStat.MaxMemBytes segStats.MemBytes += segStat.MemBytes for _, b := range seg.blockController.blocks() { - if b.closed.Load() { + if b.Closed() { continue } names, bss := b.stats() diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index a758947..800f128 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -77,9 +77,25 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, if err != nil { return nil, err } - if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil { - return nil, err + o := ctx.Value(optionsKey) + if o != nil { + options := o.(DatabaseOpts) + if options.EnableGlobalIndex { + memSize := options.GlobalIndexMemSize + if memSize == 0 { + memSize = 1 << 20 + } + if s.globalIndex, err = kv.OpenStore( + 0, + indexPath, + kv.StoreWithLogger(s.l), + kv.StoreWithMemTableSize(memSize), + ); err != nil { + return nil, err + } + } } + s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l)) if err != nil { return nil, err @@ -90,7 +106,9 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, func (s *segment) close() { s.blockController.close() - s.globalIndex.Close() + if s.globalIndex != nil { + s.globalIndex.Close() + } s.Stop() } @@ -103,6 +121,9 @@ func (s segment) String() string { } func (s *segment) Stats() observability.Statistics { + if s.globalIndex == nil { + return observability.Statistics{} + } return s.globalIndex.Stats() } @@ -159,7 +180,6 @@ func (bc *blockController) Next() (bucket.Reporter, error) { if errors.Is(err, ErrEndOfSegment) { return nil, bucket.ErrNoMoreBucket } - err = reporter.open() if err != nil { return nil, err } @@ -201,18 +221,30 @@ func (bc *blockController) Parse(value string) (time.Time, error) { panic("invalid interval unit") } -func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) { - return bc.ensureBlockOpen(bc.search(func(b *block) bool { +func (bc *blockController) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) { + bb := bc.search(func(b *block) bool { return b.Overlapping(timeRange) - })) + }) + if bb == nil { + return nil, nil + } + dd := make([]blockDelegate, len(bb)) + for i, b := range bb { + d, err := b.delegate() + if err != nil { + return nil, err + } + dd[i] = d + } + return dd, nil } -func (bc *blockController) get(blockID uint16) *block { +func (bc *blockController) get(blockID uint16) (blockDelegate, error) { b := bc.getBlock(blockID) if b != nil { - return bc.ensureBlockOpen([]*block{b})[0] + return b.delegate() } - return nil + return nil, nil } func (bc *blockController) getBlock(blockID uint16) *block { @@ -250,26 +282,6 @@ func (bc *blockController) search(matcher func(*block) bool) (bb []*block) { return bb } -func (bc *blockController) ensureBlockOpen(blocks []*block) (openedBlocks []*block) { - if blocks == nil { - return nil - } - for _, b := range blocks { - if b.isClosed() { - if err := b.open(); err != nil { - bc.l.Error().Err(err).Stringer("block", b).Msg("fail to open block") - continue - } - } - openedBlocks = append(openedBlocks, b) - bc.blockQueue.Push(BlockID{ - BlockID: b.blockID, - SegID: b.segID, - }) - } - return openedBlocks -} - func (bc *blockController) closeBlock(blockID uint16) { bc.RLock() defer bc.RUnlock() @@ -309,13 +321,10 @@ func (bc *blockController) open() error { return err } if bc.Current() == nil { - b, err := bc.create(bc.clock.Now()) + _, err := bc.create(bc.clock.Now()) if err != nil { return err } - if err = b.open(); err != nil { - return err - } } return nil } @@ -351,6 +360,7 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) { startTime: starTime, suffix: suffix, blockSize: bc.blockSize, + queue: bc.blockQueue, }); err != nil { return nil, err } diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index bed9555..1696616 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -95,7 +95,10 @@ type series struct { } func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) { - b := s.blockDB.block(id) + b, err := s.blockDB.block(id) + if err != nil { + return nil, nil, err + } if b == nil { return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id) } @@ -111,7 +114,10 @@ func (s *series) ID() common.SeriesID { } func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) { - blocks := s.blockDB.span(timeRange) + blocks, err := s.blockDB.span(timeRange) + if err != nil { + return nil, err + } if len(blocks) < 1 { return nil, ErrEmptySeriesSpan } diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index e2a6ef8..a7eb0f9 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -132,8 +132,8 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID - span(timeRange timestamp.TimeRange) []blockDelegate - block(id GlobalItemID) blockDelegate + span(timeRange timestamp.TimeRange) ([]blockDelegate, error) + block(id GlobalItemID) (blockDelegate, error) } var ( @@ -172,16 +172,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { return newSeries(s.context(), id, s), nil } -func (s *seriesDB) block(id GlobalItemID) blockDelegate { +func (s *seriesDB) block(id GlobalItemID) (blockDelegate, error) { seg := s.segCtrl.get(id.segID) if seg == nil { - return nil - } - block := seg.blockController.get(id.blockID) - if block == nil { - return nil + return nil, nil } - return block.delegate() + return seg.blockController.get(id.blockID) } func (s *seriesDB) shardID() common.ShardID { @@ -238,18 +234,20 @@ func (s *seriesDB) List(path Path) (SeriesList, error) { return result, err } -func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate { +func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) { // TODO: return correct blocks result := make([]blockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { - for _, b := range s.blockController.span(timeRange) { - bd := b.delegate() - if bd != nil { - result = append(result, bd) - } + dd, err := s.blockController.span(timeRange) + if err != nil { + return nil, err } + if dd == nil { + continue + } + result = append(result, dd...) } - return result + return result, nil } func (s *seriesDB) context() context.Context { @@ -270,8 +268,19 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl: segCtrl, l: logger.Fetch(ctx, "series_database"), } + memSize := int64(1 << 20) + o := ctx.Value(optionsKey) + if o != nil { + options := o.(DatabaseOpts) + if options.SeriesMemSize > 1 { + memSize = options.SeriesMemSize + } + } var err error - sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l)) + sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", + kv.StoreWithNamedLogger("metadata", sdb.l), + kv.StoreWithMemTableSize(memSize), + ) if err != nil { return nil, err } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index a318a80..6310952 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -126,7 +126,7 @@ func (s *shard) State() (shardState ShardState) { BlockID: b.blockID, }, TimeRange: b.TimeRange, - Closed: b.isClosed(), + Closed: b.Closed(), }) } } @@ -135,6 +135,7 @@ func (s *shard) State() (shardState ShardState) { for i, v := range s.segmentController.blockQueue.All() { shardState.OpenBlocks[i] = v.(BlockID) } + s.l.Info().Interface("", shardState).Msg("state") return shardState } @@ -295,7 +296,6 @@ func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter) event := sc.l.Info() if prev != nil { event.Stringer("prev", prev) - prev.(*segment).blockManageStrategy.Close() } if next != nil { event.Stringer("next", next) diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go index 7ef3839..16da8ac 100644 --- a/banyand/tsdb/shard_test.go +++ b/banyand/tsdb/shard_test.go @@ -67,7 +67,7 @@ var _ = Describe("Shard", func() { t1 := clock.Now() Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -81,7 +81,7 @@ var _ = Describe("Shard", func() { t2 := clock.Now().Add(2 * time.Hour) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -99,12 +99,12 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{})) + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{})) By("01/01 13:00 moves to the 2nd block") clock.Add(3 * time.Hour) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -115,7 +115,7 @@ var _ = Describe("Shard", func() { t3 := clock.Now().Add(2 * time.Hour) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -142,7 +142,7 @@ var _ = Describe("Shard", func() { clock.Add(3 * time.Hour) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }).Should(Equal([]tsdb.BlockID{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -157,7 +157,7 @@ var _ = Describe("Shard", func() { t4 := clock.Now().Add(2 * time.Hour) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -191,7 +191,7 @@ var _ = Describe("Shard", func() { clock.Add(3 * time.Hour) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), @@ -210,7 +210,7 @@ var _ = Describe("Shard", func() { t5 := clock.Now().Add(2 * time.Hour) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -251,7 +251,7 @@ var _ = Describe("Shard", func() { clock.Add(3 * time.Hour) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -267,7 +267,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -318,7 +318,7 @@ var _ = Describe("Shard", func() { Expect(err).NotTo(HaveOccurred()) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), @@ -334,7 +334,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{ + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 20d8e6e..feeb6b4 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -59,10 +59,10 @@ var ( ErrInvalidShardID = errors.New("invalid shard id") ErrOpenDatabase = errors.New("fails to open the database") - encodingMethodKey = contextEncodingMethodKey{} + optionsKey = contextOptionsKey{} ) -type contextEncodingMethodKey struct{} +type contextOptionsKey struct{} type Supplier interface { SupplyTSDB() Database @@ -84,11 +84,15 @@ type Shard interface { var _ Database = (*database)(nil) type DatabaseOpts struct { - Location string - ShardNum uint32 - EncodingMethod EncodingMethod - SegmentSize IntervalRule - BlockSize IntervalRule + Location string + ShardNum uint32 + EncodingMethod EncodingMethod + SegmentSize IntervalRule + BlockSize IntervalRule + BlockMemSize int64 + SeriesMemSize int64 + EnableGlobalIndex bool + GlobalIndexMemSize int64 } type EncodingMethod struct { @@ -186,7 +190,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { return nil, errors.Wrap(err, "failed to read directory contents failed") } thisContext := context.WithValue(ctx, logger.ContextKey, db.logger) - thisContext = context.WithValue(thisContext, encodingMethodKey, opts.EncodingMethod) + thisContext = context.WithValue(thisContext, optionsKey, opts) if len(entries) > 0 { return loadDatabase(thisContext, db) } diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go index 97c11f2..cd11765 100644 --- a/banyand/tsdb/tsdb_suite_test.go +++ b/banyand/tsdb/tsdb_suite_test.go @@ -26,7 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) -var defaultEventallyTimeout = 30 * time.Second +var defaultEventuallyTimeout = 30 * time.Second func TestTsdb(t *testing.T) { RegisterFailHandler(Fail) diff --git a/dist/LICENSE b/dist/LICENSE index a8afea8..71429fb 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -256,7 +256,7 @@ BSD-3-Clause licenses github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause github.com/spf13/pflag v1.0.5 BSD-3-Clause - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e BSD-3-Clause + golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 BSD-3-Clause @@ -305,7 +305,7 @@ MIT licenses github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT go.etcd.io/bbolt v1.3.6 MIT - go.uber.org/atomic v1.9.0 MIT + go.uber.org/atomic v1.7.0 MIT go.uber.org/multierr v1.8.0 MIT go.uber.org/zap v1.17.0 MIT gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT diff --git a/go.mod b/go.mod index 3b5b9d7..e49e8eb 100644 --- a/go.mod +++ b/go.mod @@ -8,15 +8,20 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/dgraph-io/badger/v3 v3.2011.1 github.com/dgraph-io/ristretto v0.1.0 + github.com/envoyproxy/protoc-gen-validate v0.1.0 + github.com/go-chi/chi/v5 v5.0.7 github.com/golang/mock v1.6.0 - github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.8 github.com/google/uuid v1.3.0 + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 + github.com/hashicorp/golang-lru v0.5.4 github.com/klauspost/compress v1.15.6 github.com/oklog/run v1.1.0 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.12.2 github.com/rs/zerolog v1.26.1 github.com/spf13/cobra v1.4.0 github.com/spf13/pflag v1.0.5 @@ -25,24 +30,13 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 go.uber.org/multierr v1.8.0 - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f + golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b google.golang.org/grpc v1.47.0 google.golang.org/protobuf v1.28.0 ) -require ( - github.com/envoyproxy/protoc-gen-validate v0.1.0 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 - github.com/hashicorp/golang-lru v0.5.4 - github.com/prometheus/client_golang v1.12.2 - go.uber.org/atomic v1.9.0 - golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f - golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 -) - -require golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect - require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect @@ -53,15 +47,14 @@ require ( github.com/dustin/go-humanize v1.0.0 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect - github.com/go-chi/chi/v5 v5.0.7 github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/flatbuffers v1.12.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect - github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -106,11 +99,14 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect + go.uber.org/atomic v1.7.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect + golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect + golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index d37cd1d..f6d35ae 100644 --- a/go.sum +++ b/go.sum @@ -512,9 +512,8 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -536,8 +535,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA= +golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -577,8 +576,6 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= -golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/pkg/index/index.go b/pkg/index/index.go index 192a94f..b3eb099 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -46,15 +46,8 @@ func (f FieldKey) Marshal() []byte { } func (f *FieldKey) Unmarshal(raw []byte) error { - switch len(raw) { - case 12: - f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8])) - f.IndexRuleID = convert.BytesToUint32(raw[8:]) - case 4: - f.IndexRuleID = convert.BytesToUint32(raw) - default: - return errors.Wrap(ErrMalformed, "unmarshal field key") - } + f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8])) + f.IndexRuleID = convert.BytesToUint32(raw[8:]) return nil } @@ -72,14 +65,17 @@ func (f Field) Marshal() ([]byte, error) { } func (f *Field) Unmarshal(raw []byte) error { + if len(raw) < 13 { + return errors.WithMessagef(ErrMalformed, "malformed field: expected more than 12, got %d", len(raw)) + } fk := &f.Key - err := fk.Unmarshal(raw[:len(raw)-8]) + err := fk.Unmarshal(raw[:12]) if err != nil { return errors.Wrap(err, "unmarshal a field") } - termID := raw[len(raw)-8:] - f.Term = make([]byte, len(termID)) - copy(f.Term, termID) + term := raw[12:] + f.Term = make([]byte, len(term)) + copy(f.Term, term) return nil } diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go index 53de61b..e903098 100644 --- a/pkg/index/lsm/lsm.go +++ b/pkg/index/lsm/lsm.go @@ -55,14 +55,19 @@ func (s *store) Write(field index.Field, itemID common.ItemID) error { } type StoreOpts struct { - Path string - Logger *logger.Logger + Path string + Logger *logger.Logger + MemTableSize int64 } func NewStore(opts StoreOpts) (index.Store, error) { var err error var lsm kv.Store - if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", kv.StoreWithLogger(opts.Logger)); err != nil { + if lsm, err = kv.OpenStore( + 0, + opts.Path+"/lsm", + kv.StoreWithLogger(opts.Logger), + kv.StoreWithMemTableSize(opts.MemTableSize)); err != nil { return nil, err } return &store{ diff --git a/pkg/run/channel.go b/pkg/run/channel.go deleted file mode 100644 index 82a9299..0000000 --- a/pkg/run/channel.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package run - -import ( - "sync" -) - -type Chan[T any] struct { - ch chan T - closer sync.WaitGroup -} - -func NewChan[T any](ch chan T) *Chan[T] { - return &Chan[T]{ - ch: ch, - } -} - -func (c *Chan[T]) Write(item T) { - c.closer.Add(1) - defer c.closer.Done() - c.ch <- item -} - -func (c *Chan[T]) Read() (T, bool) { - item, more := <-c.ch - return item, more -} - -func (c *Chan[T]) Close() error { - c.closer.Wait() - close(c.ch) - return nil -} diff --git a/pkg/test/helpers/fail_interceptor.go b/pkg/test/helpers/fail_interceptor.go index a442d27..b971088 100644 --- a/pkg/test/helpers/fail_interceptor.go +++ b/pkg/test/helpers/fail_interceptor.go @@ -17,8 +17,9 @@ package helpers import ( + "sync/atomic" + "github.com/onsi/gomega/types" - "go.uber.org/atomic" ) type FailInterceptor struct { @@ -29,7 +30,7 @@ type FailInterceptor struct { func NewFailInterceptor(fail types.GomegaFailHandler) *FailInterceptor { return &FailInterceptor{ ginkgoFail: fail, - didFail: atomic.NewBool(false), + didFail: &atomic.Bool{}, } }
