This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e4b98d5605d6cfedcb4bd31d394c7b1748e7f2a3
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Mar 30 12:56:04 2023 +0000

    Integrating Sharded Buffer into TSDB
    
    The Sharded Buffer will replace Badger's memtable for data ingestion.
     Currently, Badger KV only provides SST.
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 banyand/kv/badger.go          |   5 ++
 banyand/kv/kv.go              |  13 +---
 banyand/tsdb/block.go         | 143 +++++++++++++++++++++++++++++++++---------
 banyand/tsdb/buffer.go        |  26 +++++++-
 dist/LICENSE                  |   2 +-
 go.mod                        |   2 +-
 go.sum                        |   4 +-
 test/cases/measure/measure.go |   2 +-
 test/e2e-v2/script/env        |   8 +--
 9 files changed, 156 insertions(+), 49 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/dgraph-io/badger/v3"
        "github.com/dgraph-io/badger/v3/banyandb"
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/dgraph-io/badger/v3/y"
 
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
        dbOpts badger.Options
 }
 
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+       return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
        return badgerStats(b.db)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/dgraph-io/badger/v3"
        "github.com/dgraph-io/badger/v3/options"
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
        Reader
 }
 
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
-       // Put a value with a timestamp/version
-       Put(key, val []byte, ts uint64) error
-       // PutAsync a value with a timestamp/version asynchronously.
-       // Injected "f" func will notice the result of value write.
-       PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
 // TimeSeriesReader allows retrieving data from a time-series storage.
 type TimeSeriesReader interface {
        // Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
 type TimeSeriesStore interface {
        observability.Observable
        io.Closer
-       TimeSeriesWriter
+       Handover(skl *skl.Skiplist) error
        TimeSeriesReader
 }
 
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options 
...TimeSeriesOptions) (TimeSeriesS
        if btss.dbOpts.MemTableSize < 8<<20 {
                btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
        }
+       btss.dbOpts = btss.dbOpts.WithInTable()
        var err error
        btss.db, err = badger.Open(btss.dbOpts)
        if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..763f86bd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
@@ -48,40 +49,46 @@ const (
        componentSecondInvertedIdx = "inverted"
        componentSecondLSMIdx      = "lsm"
 
-       defaultMainMemorySize = 8 << 20
-       defaultEnqueueTimeout = 500 * time.Millisecond
+       defaultBufferSize       = 8 << 20
+       defaultEnqueueTimeout   = 500 * time.Millisecond
+       maxBlockAge             = time.Hour
+       defaultWriteConcurrency = 1000
+       defaultNumBufferShards  = 2
 )
 
 var errBlockClosingInterrupted = errors.New("interrupt to close the block")
 
 type block struct {
-       store    kv.TimeSeriesStore
-       openOpts openOpts
-       queue    bucket.Queue
-       bucket.Reporter
-       clock         timestamp.Clock
-       lsmIndex      index.Store
        invertedIndex index.Store
-       closed        *atomic.Bool
-       l             *logger.Logger
-       deleted       *atomic.Bool
-       ref           *atomic.Int32
-       position      common.Position
+       sst           kv.TimeSeriesStore
+       queue         bucket.Queue
+       bucket.Reporter
+       clock            timestamp.Clock
+       lsmIndex         index.Store
+       closed           *atomic.Bool
+       buffer           *Buffer
+       l                *logger.Logger
+       deleted          *atomic.Bool
+       ref              *atomic.Int32
+       closeBufferTimer *time.Timer
+       position         common.Position
        timestamp.TimeRange
        segSuffix   string
        suffix      string
        path        string
        closableLst []io.Closer
+       openOpts    openOpts
        lock        sync.RWMutex
        segID       SectionID
        blockID     SectionID
 }
 
 type openOpts struct {
-       store     []kv.TimeSeriesOptions
-       storePath string
-       inverted  inverted.StoreOpts
-       lsm       lsm.StoreOpts
+       storePath  string
+       inverted   inverted.StoreOpts
+       lsm        lsm.StoreOpts
+       store      []kv.TimeSeriesOptions
+       bufferSize int64
 }
 
 type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
        if options.CompressionMethod.Type == CompressionTypeZSTD {
                b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
        }
-       var memSize int64
+       var bufferSize int64
        if options.BlockMemSize < 1 {
-               memSize = defaultMainMemorySize
+               bufferSize = defaultBufferSize
        } else {
-               memSize = options.BlockMemSize
+               bufferSize = options.BlockMemSize
        }
-       b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+       b.openOpts.bufferSize = bufferSize
+       b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
        b.openOpts.storePath = path.Join(b.path, componentMain)
        b.openOpts.inverted = inverted.StoreOpts{
                Path:         path.Join(b.path, componentSecondInvertedIdx),
                Logger:       b.l.Named(componentSecondInvertedIdx),
                BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
        }
-       lsmMemSize := memSize / 8
+       lsmMemSize := bufferSize / 8
        if lsmMemSize < defaultKVMemorySize {
                lsmMemSize = defaultKVMemorySize
        }
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
 }
 
 func (b *block) open() (err error) {
-       if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, 
b.openOpts.store...); err != nil {
+       if b.isActive() {
+               if err = b.openBuffer(); err != nil {
+                       return err
+               }
+       }
+       if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, 
b.openOpts.store...); err != nil {
                return err
        }
-       b.closableLst = append(b.closableLst, b.store)
+       b.closableLst = append(b.closableLst, b.sst)
        if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err 
!= nil {
                return err
        }
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
        return nil
 }
 
+func (b *block) openBuffer() (err error) {
+       if b.buffer != nil {
+               return nil
+       }
+       if b.buffer, err = NewBuffer(b.l, 
int(b.openOpts.bufferSize/defaultNumBufferShards),
+               defaultWriteConcurrency, defaultNumBufferShards, b.flush); err 
!= nil {
+               return err
+       }
+       now := b.clock.Now()
+       max := b.maxTime()
+       closeAfter := max.Sub(now)
+       // TODO: we should move inactive write buffer to segment or shard.
+       if now.After(max) {
+               closeAfter = maxBlockAge
+       }
+       // create a timer to close buffer
+       b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+               if b.l.Debug().Enabled() {
+                       b.l.Debug().Msg("closing buffer")
+               }
+               b.lock.Lock()
+               defer b.lock.Unlock()
+               if b.buffer == nil {
+                       return
+               }
+               if err := b.buffer.Close(); err != nil {
+                       b.l.Error().Err(err).Msg("close buffer error")
+               }
+               b.buffer = nil
+       })
+       return nil
+}
+
+func (b *block) isActive() bool {
+       return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+       return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+       b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+       return b.sst.Handover(skl)
+}
+
 func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
        if b.deleted.Load() {
                return nil, errors.WithMessagef(errBlockAbsent, "block %s is 
deleted", b)
@@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) {
        case <-ch:
        }
        b.closed.Store(true)
+       if b.closeBufferTimer != nil {
+               b.closeBufferTimer.Stop()
+       }
+       if b.buffer != nil {
+               err = multierr.Append(err, b.buffer.Close())
+       }
        for _, closer := range b.closableLst {
                err = multierr.Append(err, closer.Close())
        }
@@ -308,15 +373,25 @@ func (b *block) String() string {
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
-       names = append(names, componentMain, componentSecondInvertedIdx, 
componentSecondLSMIdx)
        if b.Closed() {
-               stats = make([]observability.Statistics, 3)
+               stats = make([]observability.Statistics, 0)
                return
        }
-       stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), 
b.lsmIndex.Stats())
+       bnn, bss := b.buffer.Stats()
+       names = append(names, bnn...)
+       stats = append(stats, bss...)
+       names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+       stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
        return names, stats
 }
 
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+       if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+               return v, nil
+       }
+       return b.sst.Get(key, ts)
+}
+
 type blockDelegate interface {
        io.Closer
        contains(ts time.Time) bool
@@ -340,7 +415,7 @@ type bDelegate struct {
 }
 
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
-       return d.delegate.store
+       return d.delegate
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID 
SectionID) {
 }
 
 func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
-       return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+       // On-demand open buffer
+       if d.delegate.buffer == nil {
+               d.delegate.lock.Lock()
+               if err := d.delegate.openBuffer(); err != nil {
+                       d.delegate.lock.Unlock()
+                       return err
+               }
+               d.delegate.lock.Unlock()
+       }
+       d.delegate.buffer.Write(key, val, ts)
+       return nil
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) 
error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..e6f4ac0a 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
        "github.com/dgraph-io/badger/v3/skl"
        "github.com/dgraph-io/badger/v3/y"
 
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, 
writeConcurrency, numShards int, o
 
 // Write adds a key-value pair with a timestamp to the appropriate shard 
bucket in the buffer.
 func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
-       if !b.entryCloser.AddRunning() {
+       if b == nil || !b.entryCloser.AddRunning() {
                return
        }
        defer b.entryCloser.Done()
@@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp 
time.Time) {
 
 // Read retrieves the value associated with the given key and timestamp from 
the appropriate shard bucket in the buffer.
 func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+       if b == nil || !b.entryCloser.AddRunning() {
+               return nil, false
+       }
+       defer b.entryCloser.Done()
        keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
        index := b.getShardIndex(key)
        epoch := uint64(ts.UnixNano())
@@ -157,6 +162,25 @@ func (b *Buffer) Close() error {
        return nil
 }
 
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+       names := make([]string, b.numShards)
+       stats := make([]observability.Statistics, b.numShards)
+       for i := 0; i < b.numShards; i++ {
+               names[i] = fmt.Sprintf("buffer-%d", i)
+               var size, maxSize int64
+               for _, l := range b.buckets[i].getAll() {
+                       size += l.MemSize()
+                       maxSize += int64(b.buckets[i].size)
+               }
+               stats[i] = observability.Statistics{
+                       MemBytes:    size,
+                       MaxMemBytes: maxSize,
+               }
+       }
+       return names, stats
+}
+
 func (b *Buffer) getShardIndex(key []byte) uint64 {
        return convert.Hash(key) % uint64(b.numShards)
 }
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
 Apache-2.0 licenses
 ========================================================================
 
-    github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+    github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
     github.com/blevesearch/segment v0.9.0 Apache-2.0
     github.com/blevesearch/vellum v1.0.7 Apache-2.0
     github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
        github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
        github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
-       github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 
v3.0.0-20221227124922-b88a2f7d336f
+       github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 
v3.0.0-20230329010346-dfdf57f0581b
 )
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 
h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/RoaringBitmap/roaring v0.9.4 
h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f 
h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod 
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b 
h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod 
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod 
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod 
h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
        }
 )
 
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, 
Offset: -20 * time.Minute}),
        g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ 
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9

Reply via email to