This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f5a645c Integrating Sharded Buffer into TSDB (#263)
1f5a645c is described below

commit 1f5a645ccd98576e425b71a2ce249bff0927f7b3
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Apr 6 22:39:49 2023 +0800

    Integrating Sharded Buffer into TSDB (#263)
    
    * Integrating Sharded Buffer into TSDB. The Sharded Buffer will replace 
Badger's memtable for data ingestion.
     Currently, Badger KV only provides SST.
    
    * Sync e2e cases with the main repo.
    
    * Fix buffer crashing
    
    * Clear volume counter after flushing
    
    * Fix unstopped loop
    
    ---------
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   2 +-
 banyand/Dockerfile                                 |   2 +-
 banyand/kv/badger.go                               |   5 +
 banyand/kv/kv.go                                   |  13 +-
 banyand/measure/measure_write.go                   |   4 +-
 banyand/metadata/schema/checker_test.go            |   4 +-
 banyand/metadata/schema/error.go                   |   4 +-
 banyand/metadata/schema/etcd.go                    |  10 +-
 banyand/tsdb/block.go                              | 165 +++++++++++++++------
 banyand/tsdb/buffer.go                             |  82 +++++++---
 banyand/tsdb/metric.go                             |  25 ++--
 dist/LICENSE                                       |   2 +-
 go.mod                                             |   2 +-
 go.sum                                             |   4 +-
 pkg/index/inverted/inverted.go                     |   3 +
 pkg/pb/v1/write.go                                 |   2 +-
 pkg/run/closer.go                                  |  17 +++
 pkg/schema/metadata.go                             |   3 +-
 test/cases/measure/measure.go                      |   2 +-
 .../ebpf/oncpu/expected/profiling-task-list.yml    |   3 +
 .../trace/expected/profile-segment-list.yml        |  37 ++++-
 .../cases/profiling/trace/profiling-cases.yaml     |  24 +--
 .../cases/storage/expected/dependency-instance.yml |   2 +-
 .../expected/dependency-services-consumer.yml      |   9 +-
 .../expected/dependency-services-provider.yml      |   9 +-
 .../expected/metrics-has-value-percentile.yml      |  40 ++++-
 .../cases/storage/expected/metrics-has-value.yml   |   8 +-
 .../metrics-nullable-single-sla-empty.yml}         |  27 +---
 ...s-value.yml => metrics-nullable-single-sla.yml} |   6 +-
 ...etrics-has-value.yml => metrics-single-sla.yml} |   5 +-
 test/e2e-v2/script/env                             |   8 +-
 31 files changed, 354 insertions(+), 175 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 17c09ea6..fc47300b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,7 +9,7 @@ Release Notes.
 - Add TSDB concept document.
 - [UI] Add YAML editor for inputting query criteria.
 - Refactor TopN to support `NULL` group while keeping seriesID from the source 
measure.
-- Add a sharded buffer to TSDB.
+- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only 
provides SST.
 
 ### Chores
 
diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index 72cdbbcd..3b5af6ba 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -27,7 +27,7 @@ EXPOSE 2345
 
 ENTRYPOINT ["air"]
 
-FROM golang:1.19 AS base
+FROM golang:1.20 AS base
 
 ENV GOPATH "/go"
 ENV GO111MODULE "on"
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/dgraph-io/badger/v3"
        "github.com/dgraph-io/badger/v3/banyandb"
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/dgraph-io/badger/v3/y"
 
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
        dbOpts badger.Options
 }
 
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+       return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
        return badgerStats(b.db)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/dgraph-io/badger/v3"
        "github.com/dgraph-io/badger/v3/options"
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
        Reader
 }
 
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
-       // Put a value with a timestamp/version
-       Put(key, val []byte, ts uint64) error
-       // PutAsync a value with a timestamp/version asynchronously.
-       // Injected "f" func will notice the result of value write.
-       PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
 // TimeSeriesReader allows retrieving data from a time-series storage.
 type TimeSeriesReader interface {
        // Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
 type TimeSeriesStore interface {
        observability.Observable
        io.Closer
-       TimeSeriesWriter
+       Handover(skl *skl.Skiplist) error
        TimeSeriesReader
 }
 
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options 
...TimeSeriesOptions) (TimeSeriesS
        if btss.dbOpts.MemTableSize < 8<<20 {
                btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
        }
+       btss.dbOpts = btss.dbOpts.WithInTable()
        var err error
        btss.db, err = badger.Open(btss.dbOpts)
        if err != nil {
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index b3615741..e2a9f54c 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -185,9 +185,9 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue) 
[]byte {
        case *modelv1.FieldValue_Float:
                return convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
        case *modelv1.FieldValue_Str:
-               return []byte(fieldValue.GetStr().Value)
+               return []byte(fieldValue.GetStr().GetValue())
        case *modelv1.FieldValue_BinaryData:
-               return fieldValue.GetBinaryData()
+               return bytes.Clone(fieldValue.GetBinaryData())
        }
        return nil
 }
diff --git a/banyand/metadata/schema/checker_test.go 
b/banyand/metadata/schema/checker_test.go
index 2280d88c..d15f814d 100644
--- a/banyand/metadata/schema/checker_test.go
+++ b/banyand/metadata/schema/checker_test.go
@@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Utils", func() {
                })
 
                ginkgo.AfterEach(func() {
-                       
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
+                       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
                })
 
                ginkgo.It("should be equal if nothing changed", func() {
@@ -188,7 +188,7 @@ var _ = ginkgo.Describe("Utils", func() {
                })
 
                ginkgo.AfterEach(func() {
-                       
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
+                       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
                })
 
                ginkgo.It("should be equal if nothing changed", func() {
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index 00e32eaa..f220c281 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -27,6 +27,8 @@ import (
 var (
        // ErrGRPCResourceNotFound indicates the resource doesn't exist.
        ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
+       // ErrClosed indicates the registry is closed.
+       ErrClosed = errors.New("metadata registry is closed")
 
        statusGRPCInvalidArgument  = status.New(codes.InvalidArgument, 
"banyandb: input is invalid")
        statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: 
resource not found")
@@ -34,8 +36,6 @@ var (
        errGRPCAlreadyExists       = statusGRPCAlreadyExists.Err()
        statusDataLoss             = status.New(codes.DataLoss, "banyandb: 
resource corrupts.")
        errGRPCDataLoss            = statusDataLoss.Err()
-
-       errClosed = errors.New("metadata registry is closed")
 )
 
 // BadRequest creates a gRPC error with error details with type BadRequest,
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 91b0b243..d87512de 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -200,7 +200,7 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
 
 func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message 
proto.Message) error {
        if !e.closer.AddRunning() {
-               return errClosed
+               return ErrClosed
        }
        defer e.closer.Done()
        resp, err := e.client.Get(ctx, key)
@@ -229,7 +229,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key 
string, message proto.
 // Otherwise, it will return ErrGRPCResourceNotFound.
 func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) 
error {
        if !e.closer.AddRunning() {
-               return errClosed
+               return ErrClosed
        }
        defer e.closer.Done()
        key, err := metadata.key()
@@ -281,7 +281,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, 
metadata Metadata) erro
 // Otherwise, it will return ErrGRPCAlreadyExists.
 func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) 
error {
        if !e.closer.AddRunning() {
-               return errClosed
+               return ErrClosed
        }
        defer e.closer.Done()
        key, err := metadata.key()
@@ -314,7 +314,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, 
metadata Metadata) erro
 
 func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix 
string, factory func() proto.Message) ([]proto.Message, error) {
        if !e.closer.AddRunning() {
-               return nil, errClosed
+               return nil, ErrClosed
        }
        defer e.closer.Done()
        resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(), 
clientv3.WithRange(incrementLastByte(prefix)))
@@ -343,7 +343,7 @@ func listPrefixesForEntity(group, entityPrefix string) 
string {
 
 func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) 
(bool, error) {
        if !e.closer.AddRunning() {
-               return false, errClosed
+               return false, ErrClosed
        }
        defer e.closer.Done()
        key, err := metadata.key()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..1357e458 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/dgraph-io/badger/v3/skl"
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
@@ -48,40 +49,46 @@ const (
        componentSecondInvertedIdx = "inverted"
        componentSecondLSMIdx      = "lsm"
 
-       defaultMainMemorySize = 8 << 20
-       defaultEnqueueTimeout = 500 * time.Millisecond
+       defaultBufferSize       = 8 << 20
+       defaultEnqueueTimeout   = 500 * time.Millisecond
+       maxBlockAge             = time.Hour
+       defaultWriteConcurrency = 1000
+       defaultNumBufferShards  = 2
 )
 
 var errBlockClosingInterrupted = errors.New("interrupt to close the block")
 
 type block struct {
-       store    kv.TimeSeriesStore
-       openOpts openOpts
-       queue    bucket.Queue
-       bucket.Reporter
-       clock         timestamp.Clock
-       lsmIndex      index.Store
        invertedIndex index.Store
-       closed        *atomic.Bool
-       l             *logger.Logger
-       deleted       *atomic.Bool
-       ref           *atomic.Int32
-       position      common.Position
+       sst           kv.TimeSeriesStore
+       queue         bucket.Queue
+       bucket.Reporter
+       clock            timestamp.Clock
+       lsmIndex         index.Store
+       closed           *atomic.Bool
+       buffer           *Buffer
+       l                *logger.Logger
+       deleted          *atomic.Bool
+       ref              *atomic.Int32
+       closeBufferTimer *time.Timer
+       position         common.Position
        timestamp.TimeRange
        segSuffix   string
        suffix      string
        path        string
        closableLst []io.Closer
+       openOpts    openOpts
        lock        sync.RWMutex
        segID       SectionID
        blockID     SectionID
 }
 
 type openOpts struct {
-       store     []kv.TimeSeriesOptions
-       storePath string
-       inverted  inverted.StoreOpts
-       lsm       lsm.StoreOpts
+       storePath  string
+       inverted   inverted.StoreOpts
+       lsm        lsm.StoreOpts
+       store      []kv.TimeSeriesOptions
+       bufferSize int64
 }
 
 type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
        if options.CompressionMethod.Type == CompressionTypeZSTD {
                b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
        }
-       var memSize int64
+       var bufferSize int64
        if options.BlockMemSize < 1 {
-               memSize = defaultMainMemorySize
+               bufferSize = defaultBufferSize
        } else {
-               memSize = options.BlockMemSize
+               bufferSize = options.BlockMemSize
        }
-       b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+       b.openOpts.bufferSize = bufferSize
+       b.openOpts.store = append(b.openOpts.store, 
kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
        b.openOpts.storePath = path.Join(b.path, componentMain)
        b.openOpts.inverted = inverted.StoreOpts{
                Path:         path.Join(b.path, componentSecondInvertedIdx),
                Logger:       b.l.Named(componentSecondInvertedIdx),
                BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
        }
-       lsmMemSize := memSize / 8
+       lsmMemSize := bufferSize / 8
        if lsmMemSize < defaultKVMemorySize {
                lsmMemSize = defaultKVMemorySize
        }
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
 }
 
 func (b *block) open() (err error) {
-       if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, 
b.openOpts.store...); err != nil {
+       if b.isActive() {
+               if err = b.openBuffer(); err != nil {
+                       return err
+               }
+       }
+       if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, 
b.openOpts.store...); err != nil {
                return err
        }
-       b.closableLst = append(b.closableLst, b.store)
+       b.closableLst = append(b.closableLst, b.sst)
        if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err 
!= nil {
                return err
        }
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
        return nil
 }
 
+func (b *block) openBuffer() (err error) {
+       if b.buffer != nil {
+               return nil
+       }
+       if b.buffer, err = NewBuffer(b.l, 
int(b.openOpts.bufferSize/defaultNumBufferShards),
+               defaultWriteConcurrency, defaultNumBufferShards, b.flush); err 
!= nil {
+               return err
+       }
+       now := b.clock.Now()
+       max := b.maxTime()
+       closeAfter := max.Sub(now)
+       // TODO: we should move inactive write buffer to segment or shard.
+       if now.After(max) {
+               closeAfter = maxBlockAge
+       }
+       // create a timer to close buffer
+       b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+               if b.l.Debug().Enabled() {
+                       b.l.Debug().Msg("closing buffer")
+               }
+               b.lock.Lock()
+               defer b.lock.Unlock()
+               if b.buffer == nil {
+                       return
+               }
+               if err := b.buffer.Close(); err != nil {
+                       b.l.Error().Err(err).Msg("close buffer error")
+               }
+               b.buffer = nil
+       })
+       return nil
+}
+
+func (b *block) isActive() bool {
+       return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+       return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+       b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+       return b.sst.Handover(skl)
+}
+
 func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
        if b.deleted.Load() {
                return nil, errors.WithMessagef(errBlockAbsent, "block %s is 
deleted", b)
@@ -225,29 +284,15 @@ func (b *block) delegate(ctx context.Context) 
(blockDelegate, error) {
 }
 
 func (b *block) incRef() bool {
-loop:
        if b.Closed() {
                return false
        }
-       r := b.ref.Load()
-       if b.ref.CompareAndSwap(r, r+1) {
-               return true
-       }
-       runtime.Gosched()
-       goto loop
+       b.ref.Add(1)
+       return true
 }
 
 func (b *block) Done() {
-loop:
-       r := b.ref.Load()
-       if r < 1 {
-               return
-       }
-       if b.ref.CompareAndSwap(r, r-1) {
-               return
-       }
-       runtime.Gosched()
-       goto loop
+       b.ref.Add(-1)
 }
 
 func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
@@ -284,6 +329,12 @@ func (b *block) close(ctx context.Context) (err error) {
        case <-ch:
        }
        b.closed.Store(true)
+       if b.closeBufferTimer != nil {
+               b.closeBufferTimer.Stop()
+       }
+       if b.buffer != nil {
+               err = multierr.Append(err, b.buffer.Close())
+       }
        for _, closer := range b.closableLst {
                err = multierr.Append(err, closer.Close())
        }
@@ -308,15 +359,27 @@ func (b *block) String() string {
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
-       names = append(names, componentMain, componentSecondInvertedIdx, 
componentSecondLSMIdx)
        if b.Closed() {
-               stats = make([]observability.Statistics, 3)
+               stats = make([]observability.Statistics, 0)
                return
        }
-       stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), 
b.lsmIndex.Stats())
+       bnn, bss := b.buffer.Stats()
+       if bnn != nil {
+               names = append(names, bnn...)
+               stats = append(stats, bss...)
+       }
+       names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+       stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
        return names, stats
 }
 
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+       if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+               return v, nil
+       }
+       return b.sst.Get(key, ts)
+}
+
 type blockDelegate interface {
        io.Closer
        contains(ts time.Time) bool
@@ -340,7 +403,7 @@ type bDelegate struct {
 }
 
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
-       return d.delegate.store
+       return d.delegate
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +427,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID 
SectionID) {
 }
 
 func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
-       return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+       // On-demand open buffer
+       if d.delegate.buffer == nil {
+               d.delegate.lock.Lock()
+               if err := d.delegate.openBuffer(); err != nil {
+                       d.delegate.lock.Unlock()
+                       return err
+               }
+               d.delegate.lock.Unlock()
+       }
+       d.delegate.buffer.Write(key, val, ts)
+       return nil
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) 
error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..75ba8e96 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -21,16 +21,21 @@ import (
        "fmt"
        "sync"
        "time"
+       "unsafe"
 
        "github.com/dgraph-io/badger/v3/skl"
        "github.com/dgraph-io/badger/v3/y"
 
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-const defaultSize = 1 << 20 // 1MB
+const (
+       defaultSize = 1 << 20 // 1MB
+       nodeAlign   = int(unsafe.Sizeof(uint64(0))) - 1
+)
 
 type operation struct {
        key   []byte
@@ -53,8 +58,7 @@ type bufferShardBucket struct {
        log            *logger.Logger
        immutables     []*skl.Skiplist
        index          int
-       flushSize      int
-       size           int
+       capacity       int
        mutex          sync.RWMutex
 }
 
@@ -72,10 +76,6 @@ type Buffer struct {
 
 // NewBuffer creates a new Buffer instance with the given parameters.
 func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, 
onFlushFn onFlush) (*Buffer, error) {
-       size := flushSize
-       if size < defaultSize {
-               size = defaultSize
-       }
        buckets := make([]bufferShardBucket, numShards)
        buffer := &Buffer{
                buckets:     buckets,
@@ -89,9 +89,8 @@ func NewBuffer(log *logger.Logger, flushSize, 
writeConcurrency, numShards int, o
        for i := 0; i < numShards; i++ {
                buckets[i] = bufferShardBucket{
                        index:          i,
-                       size:           size,
-                       mutable:        skl.NewSkiplist(int64(size)),
-                       flushSize:      flushSize,
+                       capacity:       flushSize,
+                       mutable:        skl.NewSkiplist(int64(flushSize)),
                        writeCh:        make(chan operation, writeConcurrency),
                        flushCh:        make(chan flushEvent, 1),
                        writeWaitGroup: &buffer.writeWaitGroup,
@@ -105,7 +104,7 @@ func NewBuffer(log *logger.Logger, flushSize, 
writeConcurrency, numShards int, o
 
 // Write adds a key-value pair with a timestamp to the appropriate shard 
bucket in the buffer.
 func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
-       if !b.entryCloser.AddRunning() {
+       if b == nil || !b.entryCloser.AddRunning() {
                return
        }
        defer b.entryCloser.Done()
@@ -119,10 +118,16 @@ func (b *Buffer) Write(key, value []byte, timestamp 
time.Time) {
 
 // Read retrieves the value associated with the given key and timestamp from 
the appropriate shard bucket in the buffer.
 func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+       if b == nil || !b.entryCloser.AddRunning() {
+               return nil, false
+       }
+       defer b.entryCloser.Done()
        keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
        index := b.getShardIndex(key)
        epoch := uint64(ts.UnixNano())
-       for _, bk := range b.buckets[index].getAll() {
+       ll, deferFn := b.buckets[index].getAll()
+       defer deferFn()
+       for _, bk := range ll {
                value := bk.Get(keyWithTS)
                if value.Meta == 0 && value.Value == nil {
                        continue
@@ -157,11 +162,41 @@ func (b *Buffer) Close() error {
        return nil
 }
 
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+       if b == nil || !b.entryCloser.AddRunning() {
+               return nil, nil
+       }
+       names := make([]string, b.numShards)
+       stats := make([]observability.Statistics, b.numShards)
+       size := func(bucket *bufferShardBucket) (size int64, maxSize int64) {
+               ll, deferFn := bucket.getAll()
+               defer deferFn()
+               for _, l := range ll {
+                       if l == nil {
+                               continue
+                       }
+                       size += l.MemSize()
+                       maxSize += int64(bucket.capacity)
+               }
+               return
+       }
+       for i := 0; i < b.numShards; i++ {
+               names[i] = fmt.Sprintf("buffer-%d", i)
+               size, maxSize := size(&b.buckets[i])
+               stats[i] = observability.Statistics{
+                       MemBytes:    size,
+                       MaxMemBytes: maxSize,
+               }
+       }
+       return names, stats
+}
+
 func (b *Buffer) getShardIndex(key []byte) uint64 {
        return convert.Hash(key) % uint64(b.numShards)
 }
 
-func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
+func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
        bsb.mutex.RLock()
        defer bsb.mutex.RUnlock()
        allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
@@ -172,7 +207,11 @@ func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
                allList[i+1] = bsb.immutables[last-i]
                bsb.immutables[last-i].IncrRef()
        }
-       return allList
+       return allList, func() {
+               for _, l := range allList {
+                       l.DecrRef()
+               }
+       }
 }
 
 func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
@@ -192,22 +231,27 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
        }()
        go func() {
                defer bsb.writeWaitGroup.Done()
+               volume := 0
                for op := range bsb.writeCh {
-                       bsb.mutex.Lock()
-                       if bsb.mutable.MemSize() >= int64(bsb.flushSize) {
+                       k := y.KeyWithTs(op.key, op.epoch)
+                       v := y.ValueStruct{Value: op.value}
+                       volume += len(k) + int(v.EncodedSize()) + 
skl.MaxNodeSize + nodeAlign
+                       if volume >= bsb.capacity || bsb.mutable.MemSize() >= 
int64(bsb.capacity) {
                                select {
                                case bsb.flushCh <- flushEvent{data: 
bsb.mutable}:
                                default:
                                }
+                               volume = 0
+                               bsb.mutex.Lock()
                                bsb.swap()
+                               bsb.mutex.Unlock()
                        }
-                       bsb.mutex.Unlock()
-                       bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch), 
y.ValueStruct{Value: op.value, Version: op.epoch})
+                       bsb.mutable.Put(k, v)
                }
        }()
 }
 
 func (bsb *bufferShardBucket) swap() {
        bsb.immutables = append(bsb.immutables, bsb.mutable)
-       bsb.mutable = skl.NewSkiplist(int64(bsb.size))
+       bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
 }
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index 40340b53..bdd0d91f 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -18,8 +18,10 @@
 package tsdb
 
 import (
+       "fmt"
        "time"
 
+       "github.com/pkg/errors"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
@@ -50,17 +52,22 @@ func init() {
        )
 }
 
-func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
+func (s *shard) stat(_ time.Time, _ *logger.Logger) (r bool) {
+       r = true
        defer func() {
                if r := recover(); r != nil {
-                       s.l.Warn().Interface("r", r).Msg("recovered")
+                       err, ok := r.(error)
+                       if !ok {
+                               err = fmt.Errorf("%v", r)
+                       }
+                       s.l.Warn().Err(errors.WithStack(err)).Msg("recovered")
                }
        }()
        seriesStat := s.seriesDatabase.Stats()
        
s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
        
s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
        segStats := observability.Statistics{}
-       blockStats := newBlockStat()
+       blockStats := make(map[string]observability.Statistics)
        for _, seg := range s.segmentController.segments() {
                segStat := seg.Stats()
                segStats.MaxMemBytes += segStat.MaxMemBytes
@@ -75,6 +82,8 @@ func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
                                if ok {
                                        bsc.MaxMemBytes += bs.MaxMemBytes
                                        bsc.MemBytes += bs.MemBytes
+                               } else {
+                                       blockStats[names[i]] = bs
                                }
                        }
                }
@@ -85,7 +94,7 @@ func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
                s.curry(mtBytes).WithLabelValues(name).Set(float64(bs.MemBytes))
                
s.curry(maxMtBytes).WithLabelValues(name).Set(float64(bs.MaxMemBytes))
        }
-       return true
+       return
 }
 
 func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec {
@@ -95,11 +104,3 @@ func (s *shard) curry(gv *prometheus.GaugeVec) 
*prometheus.GaugeVec {
                "shard":    s.position.Shard,
        })
 }
-
-func newBlockStat() map[string]*observability.Statistics {
-       return map[string]*observability.Statistics{
-               componentMain:              {},
-               componentSecondInvertedIdx: {},
-               componentSecondLSMIdx:      {},
-       }
-}
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
 Apache-2.0 licenses
 ========================================================================
 
-    github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+    github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
     github.com/blevesearch/segment v0.9.0 Apache-2.0
     github.com/blevesearch/vellum v1.0.7 Apache-2.0
     github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
        github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
        github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
-       github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 
v3.0.0-20221227124922-b88a2f7d336f
+       github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 
v3.0.0-20230329010346-dfdf57f0581b
 )
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 
h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
 github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/RoaringBitmap/roaring v0.9.4 
h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f 
h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod 
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b 
h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod 
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod 
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod 
h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 3020f3c4..ff4cd4d4 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -165,6 +165,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
        if err != nil {
                return nil, err
        }
+       defer reader.Close()
        fk := fieldKey.MarshalIndexRule()
        var query bluge.Query
        shouldDecodeTerm := true
@@ -210,6 +211,7 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
        if err != nil {
                return nil, err
        }
+       defer reader.Close()
        fk := field.Key.MarshalIndexRule()
        var query bluge.Query
        shouldDecodeTerm := true
@@ -242,6 +244,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches 
[]string) (posting.List,
        if err != nil {
                return nil, err
        }
+       defer reader.Close()
        analyzer := analyzers[fieldKey.Analyzer]
        fk := fieldKey.MarshalIndexRule()
        query := bluge.NewBooleanQuery()
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 600df45c..5ff2bb09 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -148,7 +148,7 @@ func ParseTagValue(tagValue *modelv1.TagValue) (TagValue, 
error) {
                }
                return *fv, nil
        case *modelv1.TagValue_BinaryData:
-               return newValue(x.BinaryData), nil
+               return newValue(bytes.Clone(x.BinaryData)), nil
        case *modelv1.TagValue_Id:
                return newValue([]byte(x.Id.GetValue())), nil
        }
diff --git a/pkg/run/closer.go b/pkg/run/closer.go
index 41d42053..5f718ee4 100644
--- a/pkg/run/closer.go
+++ b/pkg/run/closer.go
@@ -22,6 +22,8 @@ import (
        "sync"
 )
 
+var dummyCloserChan <-chan struct{}
+
 // Closer can close a goroutine then wait for it to stop.
 type Closer struct {
        ctx     context.Context
@@ -41,6 +43,9 @@ func NewCloser(initial int) *Closer {
 
 // AddRunning adds a running task.
 func (c *Closer) AddRunning() bool {
+       if c == nil {
+               return false
+       }
        c.lock.RLock()
        defer c.lock.RUnlock()
        if c.closed {
@@ -52,16 +57,25 @@ func (c *Closer) AddRunning() bool {
 
 // CloseNotify receives a signal from Close.
 func (c *Closer) CloseNotify() <-chan struct{} {
+       if c == nil {
+               return dummyCloserChan
+       }
        return c.ctx.Done()
 }
 
 // Done notifies that one task is done.
 func (c *Closer) Done() {
+       if c == nil {
+               return
+       }
        c.waiting.Done()
 }
 
 // CloseThenWait closes all tasks then waits till they are done.
 func (c *Closer) CloseThenWait() {
+       if c == nil {
+               return
+       }
        c.cancel()
        c.lock.Lock()
        c.closed = true
@@ -71,6 +85,9 @@ func (c *Closer) CloseThenWait() {
 
 // Closed returns whether the Closer is closed.
 func (c *Closer) Closed() bool {
+       if c == nil {
+               return true
+       }
        c.lock.RLock()
        defer c.lock.RUnlock()
        return c.closed
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index c2f6a90c..103958d5 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -34,6 +34,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -196,7 +197,7 @@ func (sr *schemaRepo) Watcher() {
                                                        err = 
sr.deleteResource(evt.Metadata)
                                                }
                                        }
-                                       if err != nil {
+                                       if err != nil && !errors.Is(err, 
schema.ErrClosed) {
                                                
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
                                                select {
                                                case sr.eventCh <- evt:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
        }
 )
 
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, 
Offset: -20 * time.Minute}),
        g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
diff --git 
a/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml 
b/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
index faa04bc5..f357fd6f 100644
--- a/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
@@ -19,6 +19,8 @@
   servicename: sqrt
   serviceinstanceid: null
   serviceinstancename: null
+  processid: null
+  processname: null
   processlabels:
     {{- contains .processlabels }}
     - e2e-label1
@@ -29,4 +31,5 @@
   fixedtriggerduration: 60
   targettype: ON_CPU
   createtime: {{ gt .createtime 0 }}
+  continuousprofilingcauses: []
 {{- end }}
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml 
b/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
index a9230468..5c45b8e7 100644
--- a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
+++ b/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
@@ -14,14 +14,39 @@
 # limitations under the License.
 
 {{- contains . }}
-- segmentid: {{ notEmpty .segmentid }}
+- traceid: {{ notEmpty .traceid }}
+  instanceid: {{ notEmpty .instanceid }}
+  instancename: provider1
   endpointnames:
     - POST:/profile/{name}
   duration: {{ gt .duration 0 }}
   start: "{{ notEmpty .start }}"
-  iserror: false
-  traceids:
-  {{- contains .traceids }}
-    - {{ notEmpty . }}
-  {{- end }}
+  spans:
+  {{- contains .spans}}
+  - spanid: {{ ge .spanid 0 }}
+    parentspanid: {{ .parentspanid }}
+    segmentid: {{ notEmpty .segmentid }}
+    refs: []
+    servicecode: e2e-service-provider
+    serviceinstancename: provider1
+    starttime: {{ gt .starttime 0 }}
+    endtime: {{ gt .endtime 0 }}
+    endpointname: POST:/profile/{name}
+    type: Entry
+    peer: ""
+    component: SpringMVC
+    iserror: false
+    layer: Http
+    tags:
+      {{- contains .tags }}
+      - key: url
+        value: {{ notEmpty .value }}
+      - key: http.method
+        value: POST
+      - key: http.params
+        value: "e2e=[true]"
+      {{- end }}
+    logs: []
+    profiled: true
+  {{- end}}
 {{- end }}
diff --git a/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml 
b/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
index 7a395c2a..69fc6557 100644
--- a/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
+++ b/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
@@ -55,22 +55,22 @@
           swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list 
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e 
'.[0].id' - \
         )
       expected: expected/profile-segment-list.yml
-    # profiled segment detail
-    - query: |
-        swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace 
profiled-segment --segment-id=$( \
-          swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list 
--task-id=$( \
-            swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list 
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e 
'.[0].id' - \
-          ) | yq e '.[0].segmentid' - \
-        )
-      expected: expected/profile-segment-detail.yml
     # query profiled segment analyze
     - query: |
         segmentid=$( \
           swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list 
--task-id=$( \
             swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list 
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e 
'.[0].id' - \
-          ) | yq e '.[0].segmentid' - \
+          ) | yq e '.[0].spans.[] | select(.spanid == 0) | .segmentid' - \
+        );
+        start=$(
+          swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list 
--task-id=$( \
+            swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list 
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e 
'.[0].id' - \
+          ) | yq e '.[0].spans.[] | select(.spanid == 0) | .starttime' - \
+        );
+        end=$(
+          swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list 
--task-id=$( \
+            swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list 
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e 
'.[0].id' - \
+          ) | yq e '.[0].spans.[] | select(.spanid == 0) | .endtime' - \
         );
-        start=$(swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace 
profiled-segment --segment-id=$segmentid | yq e '.spans[] | select(.spanid == 
0).starttime' -);
-        end=$(swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace 
profiled-segment --segment-id=$segmentid | yq e '.spans[] | select(.spanid == 
0).endtime' -);
-        swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace analysis 
--segment-id=$segmentid --time-ranges=$(echo $start"-"$end)
+        swctl --display yaml 
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace analysis 
--segment-ids=$segmentid --time-ranges=$(echo $start"-"$end)
       expected: expected/profile-segment-analyze.yml
diff --git a/test/e2e-v2/cases/storage/expected/dependency-instance.yml 
b/test/e2e-v2/cases/storage/expected/dependency-instance.yml
index d3b9547c..61cc1cad 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-instance.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-instance.yml
@@ -25,7 +25,7 @@ nodes:
   name: provider1
   serviceid: {{ b64enc "e2e-service-provider" }}.1
   servicename: e2e-service-provider
-  type: "Tomcat"
+  type: ""
   isreal: true
 {{- end }}
 calls:
diff --git 
a/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml 
b/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
index 11ba084e..106faf57 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
@@ -31,9 +31,11 @@ nodes:
 calls:
 {{- contains .calls }}
 - source: {{ b64enc "e2e-service-consumer"}}.1
-  sourcecomponents: []
+  sourcecomponents:
+    - SpringRestTemplate
   target: {{ b64enc "e2e-service-provider"}}.1
-  targetcomponents: []
+  targetcomponents:
+    - Tomcat
   id: {{ b64enc "e2e-service-consumer"}}.1-{{ b64enc "e2e-service-provider"}}.1
   detectpoints:
     - CLIENT
@@ -41,7 +43,8 @@ calls:
 - source: {{ b64enc "User" }}.0
   sourcecomponents: []
   target: {{ b64enc "e2e-service-consumer"}}.1
-  targetcomponents: []
+  targetcomponents:
+    - Tomcat
   id: {{ b64enc "User" }}.0-{{ b64enc "e2e-service-consumer"}}.1
   detectpoints:
     - SERVER
diff --git 
a/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml 
b/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
index 600dd6dd..63646974 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
@@ -31,15 +31,18 @@ nodes:
 calls:
 {{- contains .calls }}
 - source: {{ b64enc "e2e-service-consumer"}}.1
-  sourcecomponents: []
+  sourcecomponents:
+    - SpringRestTemplate
   target: {{ b64enc "e2e-service-provider"}}.1
-  targetcomponents: []
+  targetcomponents:
+    - Tomcat
   id: {{ b64enc "e2e-service-consumer"}}.1-{{ b64enc "e2e-service-provider"}}.1
   detectpoints:
     - CLIENT
     - SERVER
 - source: {{ b64enc "e2e-service-provider" }}.1
-  sourcecomponents: []
+  sourcecomponents:
+    - h2-jdbc-driver
   target: {{ b64enc "localhost:-1"}}.0
   targetcomponents: []
   id: {{ b64enc "e2e-service-provider" }}.1-{{ b64enc "localhost:-1"}}.0
diff --git 
a/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml 
b/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
index ef174902..c360120b 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
@@ -18,30 +18,60 @@
   value:
   {{- contains .value }}
   - key: {{ notEmpty .key }}
-    value: {{ ge .value 1 }}
+    value:
+      value: 0
+      isemptyvalue: true
+  - key: {{ notEmpty .key }}
+    value:
+      value: {{ ge .value.value 1 }}
+      isemptyvalue: false
   {{- end }}
 - key: 1
   value:
   {{- contains .value }}
   - key: {{ notEmpty .key }}
-    value: {{ ge .value 1 }}
+    value:
+      value: 0
+      isemptyvalue: true
+  - key: {{ notEmpty .key }}
+    value:
+      value: {{ ge .value.value 1 }}
+      isemptyvalue: false
   {{- end }}
 - key: 2
   value:
   {{- contains .value }}
   - key: {{ notEmpty .key }}
-    value: {{ ge .value 1 }}
+    value:
+      value: 0
+      isemptyvalue: true
+  - key: {{ notEmpty .key }}
+    value:
+      value: {{ ge .value.value 1 }}
+      isemptyvalue: false
   {{- end }}
 - key: 3
   value:
   {{- contains .value }}
   - key: {{ notEmpty .key }}
-    value: {{ ge .value 1 }}
+    value:
+      value: 0
+      isemptyvalue: true
+  - key: {{ notEmpty .key }}
+    value:
+      value: {{ ge .value.value 1 }}
+      isemptyvalue: false
   {{- end }}
 - key: 4
   value:
   {{- contains .value }}
   - key: {{ notEmpty .key }}
-    value: {{ ge .value 1 }}
+    value:
+      value: 0
+      isemptyvalue: true
+  - key: {{ notEmpty .key }}
+    value:
+      value: {{ ge .value.value 1 }}
+      isemptyvalue: false
   {{- end }}
 {{- end }}
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml 
b/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
index d9c49854..ceb5b007 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
@@ -15,5 +15,11 @@
 
 {{- contains . }}
 - key: {{ notEmpty .key }}
-  value: {{ ge .value 1 }}
+  value:
+    value: 0
+    isemptyvalue: true
+- key: {{ notEmpty .key }}
+  value:
+    value: {{ ge .value.value 1 }}
+    isemptyvalue: false
 {{- end }}
\ No newline at end of file
diff --git 
a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml 
b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
similarity index 60%
rename from 
test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml
rename to 
test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
index 7b223635..25b00113 100644
--- a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
@@ -13,28 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-spans:
-{{- contains .spans }}
-- spanid: 0
-  parentspanid: -1
-  servicecode: e2e-service-provider
-  serviceinstancename: ""
-  starttime: {{ gt .starttime 0 }}
-  endtime: {{ gt .endtime 0 }}
-  endpointname: POST:/profile/{name}
-  type: Entry
-  peer: ""
-  component: SpringMVC
-  iserror: false
-  layer: Http
-  tags:
-    {{- contains .tags }}
-    - key: url
-      value: {{ notEmpty .value }}
-    - key: http.method
-      value: POST
-    - key: http.params
-      value: "e2e=[true]"
-    {{- end }}
-  logs: []
-{{- end }}
+value: 0
+isemptyvalue: true
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml 
b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
similarity index 90%
copy from test/e2e-v2/cases/storage/expected/metrics-has-value.yml
copy to test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
index d9c49854..ee72d979 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
@@ -13,7 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{{- contains . }}
-- key: {{ notEmpty .key }}
-  value: {{ ge .value 1 }}
-{{- end }}
\ No newline at end of file
+value: 10000
+isemptyvalue: false
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml 
b/test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
similarity index 90%
copy from test/e2e-v2/cases/storage/expected/metrics-has-value.yml
copy to test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
index d9c49854..dd1d3a40 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
@@ -13,7 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{{- contains . }}
-- key: {{ notEmpty .key }}
-  value: {{ ge .value 1 }}
-{{- end }}
\ No newline at end of file
+10000
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ 
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9


Reply via email to