This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch sst in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e4b98d5605d6cfedcb4bd31d394c7b1748e7f2a3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Mar 30 12:56:04 2023 +0000 Integrating Sharded Buffer into TSDB The Sharded Buffer will replace Badger's memtable for data ingestion. Currently, Badger KV only provides SST. Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/kv/badger.go | 5 ++ banyand/kv/kv.go | 13 +--- banyand/tsdb/block.go | 143 +++++++++++++++++++++++++++++++++--------- banyand/tsdb/buffer.go | 26 +++++++- dist/LICENSE | 2 +- go.mod | 2 +- go.sum | 4 +- test/cases/measure/measure.go | 2 +- test/e2e-v2/script/env | 8 +-- 9 files changed, 156 insertions(+), 49 deletions(-) diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 7f616d2f..7bc7ee99 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/banyandb" + "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -49,6 +50,10 @@ type badgerTSS struct { dbOpts badger.Options } +func (b *badgerTSS) Handover(skl *skl.Skiplist) error { + return b.db.HandoverIterator(skl.NewUniIterator(false)) +} + func (b *badgerTSS) Stats() (s observability.Statistics) { return badgerStats(b.db) } diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index 8f85432e..1a9a7cd6 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/options" + "github.com/dgraph-io/badger/v3/skl" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -76,15 +77,6 @@ type Store interface { Reader } -// TimeSeriesWriter allows writing to a time-series storage. -type TimeSeriesWriter interface { - // Put a value with a timestamp/version - Put(key, val []byte, ts uint64) error - // PutAsync a value with a timestamp/version asynchronously. - // Injected "f" func will notice the result of value write. - PutAsync(key, val []byte, ts uint64, f func(error)) error -} - // TimeSeriesReader allows retrieving data from a time-series storage. type TimeSeriesReader interface { // Get a value by its key and timestamp/version @@ -95,7 +87,7 @@ type TimeSeriesReader interface { type TimeSeriesStore interface { observability.Observable io.Closer - TimeSeriesWriter + Handover(skl *skl.Skiplist) error TimeSeriesReader } @@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS if btss.dbOpts.MemTableSize < 8<<20 { btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10) } + btss.dbOpts = btss.dbOpts.WithInTable() var err error btss.db, err = badger.Open(btss.dbOpts) if err != nil { diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 70e94db6..763f86bd 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -29,6 +29,7 @@ import ( "sync/atomic" "time" + "github.com/dgraph-io/badger/v3/skl" "github.com/pkg/errors" "go.uber.org/multierr" @@ -48,40 +49,46 @@ const ( componentSecondInvertedIdx = "inverted" componentSecondLSMIdx = "lsm" - defaultMainMemorySize = 8 << 20 - defaultEnqueueTimeout = 500 * time.Millisecond + defaultBufferSize = 8 << 20 + defaultEnqueueTimeout = 500 * time.Millisecond + maxBlockAge = time.Hour + defaultWriteConcurrency = 1000 + defaultNumBufferShards = 2 ) var errBlockClosingInterrupted = errors.New("interrupt to close the block") type block struct { - store kv.TimeSeriesStore - openOpts openOpts - queue bucket.Queue - bucket.Reporter - clock timestamp.Clock - lsmIndex index.Store invertedIndex index.Store - closed *atomic.Bool - l *logger.Logger - deleted *atomic.Bool - ref *atomic.Int32 - position common.Position + sst kv.TimeSeriesStore + queue bucket.Queue + bucket.Reporter + clock timestamp.Clock + lsmIndex index.Store + closed *atomic.Bool + buffer *Buffer + l *logger.Logger + deleted *atomic.Bool + ref *atomic.Int32 + closeBufferTimer *time.Timer + position common.Position timestamp.TimeRange segSuffix string suffix string path string closableLst []io.Closer + openOpts openOpts lock sync.RWMutex segID SectionID blockID SectionID } type openOpts struct { - store []kv.TimeSeriesOptions - storePath string - inverted inverted.StoreOpts - lsm lsm.StoreOpts + storePath string + inverted inverted.StoreOpts + lsm lsm.StoreOpts + store []kv.TimeSeriesOptions + bufferSize int64 } type blockOpts struct { @@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) { if options.CompressionMethod.Type == CompressionTypeZSTD { b.openOpts.store = append(b.openOpts.store, kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes)) } - var memSize int64 + var bufferSize int64 if options.BlockMemSize < 1 { - memSize = defaultMainMemorySize + bufferSize = defaultBufferSize } else { - memSize = options.BlockMemSize + bufferSize = options.BlockMemSize } - b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain))) + b.openOpts.bufferSize = bufferSize + b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain))) b.openOpts.storePath = path.Join(b.path, componentMain) b.openOpts.inverted = inverted.StoreOpts{ Path: path.Join(b.path, componentSecondInvertedIdx), Logger: b.l.Named(componentSecondInvertedIdx), BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec, } - lsmMemSize := memSize / 8 + lsmMemSize := bufferSize / 8 if lsmMemSize < defaultKVMemorySize { lsmMemSize = defaultKVMemorySize } @@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) { } func (b *block) open() (err error) { - if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil { + if b.isActive() { + if err = b.openBuffer(); err != nil { + return err + } + } + if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil { return err } - b.closableLst = append(b.closableLst, b.store) + b.closableLst = append(b.closableLst, b.sst) if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil { return err } @@ -193,6 +206,52 @@ func (b *block) open() (err error) { return nil } +func (b *block) openBuffer() (err error) { + if b.buffer != nil { + return nil + } + if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards), + defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil { + return err + } + now := b.clock.Now() + max := b.maxTime() + closeAfter := max.Sub(now) + // TODO: we should move inactive write buffer to segment or shard. + if now.After(max) { + closeAfter = maxBlockAge + } + // create a timer to close buffer + b.closeBufferTimer = time.AfterFunc(closeAfter, func() { + if b.l.Debug().Enabled() { + b.l.Debug().Msg("closing buffer") + } + b.lock.Lock() + defer b.lock.Unlock() + if b.buffer == nil { + return + } + if err := b.buffer.Close(); err != nil { + b.l.Error().Err(err).Msg("close buffer error") + } + b.buffer = nil + }) + return nil +} + +func (b *block) isActive() bool { + return !b.clock.Now().After(b.maxTime()) +} + +func (b *block) maxTime() time.Time { + return b.End.Add(maxBlockAge) +} + +func (b *block) flush(shardIndex int, skl *skl.Skiplist) error { + b.l.Info().Int("shard", shardIndex).Msg("flushing buffer") + return b.sst.Handover(skl) +} + func (b *block) delegate(ctx context.Context) (blockDelegate, error) { if b.deleted.Load() { return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b) @@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) { case <-ch: } b.closed.Store(true) + if b.closeBufferTimer != nil { + b.closeBufferTimer.Stop() + } + if b.buffer != nil { + err = multierr.Append(err, b.buffer.Close()) + } for _, closer := range b.closableLst { err = multierr.Append(err, closer.Close()) } @@ -308,15 +373,25 @@ func (b *block) String() string { } func (b *block) stats() (names []string, stats []observability.Statistics) { - names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx) if b.Closed() { - stats = make([]observability.Statistics, 3) + stats = make([]observability.Statistics, 0) return } - stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats()) + bnn, bss := b.buffer.Stats() + names = append(names, bnn...) + stats = append(stats, bss...) + names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx) + stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats()) return names, stats } +func (b *block) Get(key []byte, ts uint64) ([]byte, error) { + if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok { + return v, nil + } + return b.sst.Get(key, ts) +} + type blockDelegate interface { io.Closer contains(ts time.Time) bool @@ -340,7 +415,7 @@ type bDelegate struct { } func (d *bDelegate) dataReader() kv.TimeSeriesReader { - return d.delegate.store + return d.delegate } func (d *bDelegate) lsmIndexReader() index.Searcher { @@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID SectionID) { } func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error { - return d.delegate.store.Put(key, val, uint64(ts.UnixNano())) + // On-demand open buffer + if d.delegate.buffer == nil { + d.delegate.lock.Lock() + if err := d.delegate.openBuffer(); err != nil { + d.delegate.lock.Unlock() + return err + } + d.delegate.lock.Unlock() + } + d.delegate.buffer.Write(key, val, ts) + return nil } func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error { diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go index 8372554a..e6f4ac0a 100644 --- a/banyand/tsdb/buffer.go +++ b/banyand/tsdb/buffer.go @@ -25,6 +25,7 @@ import ( "github.com/dgraph-io/badger/v3/skl" "github.com/dgraph-io/badger/v3/y" + "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" @@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o // Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer. func (b *Buffer) Write(key, value []byte, timestamp time.Time) { - if !b.entryCloser.AddRunning() { + if b == nil || !b.entryCloser.AddRunning() { return } defer b.entryCloser.Done() @@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) { // Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer. func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) { + if b == nil || !b.entryCloser.AddRunning() { + return nil, false + } + defer b.entryCloser.Done() keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano())) index := b.getShardIndex(key) epoch := uint64(ts.UnixNano()) @@ -157,6 +162,25 @@ func (b *Buffer) Close() error { return nil } +// Stats returns the statistics for the buffer. +func (b *Buffer) Stats() ([]string, []observability.Statistics) { + names := make([]string, b.numShards) + stats := make([]observability.Statistics, b.numShards) + for i := 0; i < b.numShards; i++ { + names[i] = fmt.Sprintf("buffer-%d", i) + var size, maxSize int64 + for _, l := range b.buckets[i].getAll() { + size += l.MemSize() + maxSize += int64(b.buckets[i].size) + } + stats[i] = observability.Statistics{ + MemBytes: size, + MaxMemBytes: maxSize, + } + } + return names, stats +} + func (b *Buffer) getShardIndex(key []byte) uint64 { return convert.Hash(key) % uint64(b.numShards) } diff --git a/dist/LICENSE b/dist/LICENSE index 75485dd4..d04bca78 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -178,7 +178,7 @@ Apache-2.0 licenses ======================================================================== - github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0 + github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0 github.com/blevesearch/segment v0.9.0 Apache-2.0 github.com/blevesearch/vellum v1.0.7 Apache-2.0 github.com/coreos/go-semver v0.3.0 Apache-2.0 diff --git a/go.mod b/go.mod index aba7feed..a11d213a 100644 --- a/go.mod +++ b/go.mod @@ -136,5 +136,5 @@ replace ( github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3 - github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f + github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b ) diff --git a/go.sum b/go.sum index c5d2e026..ef0a79f4 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo= github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= -github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw= -github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag= +github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU= +github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index b906d86b..4db0b05d 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -39,7 +39,7 @@ var ( } ) -var _ = g.FDescribeTable("Scanning Measures", verify, +var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index 493eb132..52d79dc8 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -22,8 +22,8 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772 -SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653 -SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b +SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5 +SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068 -SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0 -SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a +SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819 +SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9