This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 1f5a645c Integrating Sharded Buffer into TSDB (#263)
1f5a645c is described below
commit 1f5a645ccd98576e425b71a2ce249bff0927f7b3
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Apr 6 22:39:49 2023 +0800
Integrating Sharded Buffer into TSDB (#263)
* Integrating Sharded Buffer into TSDB. The Sharded Buffer will replace
Badger's memtable for data ingestion.
Currently, Badger KV only provides SST.
* Sync e2e cases with the main repo.
* Fix buffer crashing
* Clear volume counter after flushing
* Fix unstopped loop
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 2 +-
banyand/Dockerfile | 2 +-
banyand/kv/badger.go | 5 +
banyand/kv/kv.go | 13 +-
banyand/measure/measure_write.go | 4 +-
banyand/metadata/schema/checker_test.go | 4 +-
banyand/metadata/schema/error.go | 4 +-
banyand/metadata/schema/etcd.go | 10 +-
banyand/tsdb/block.go | 165 +++++++++++++++------
banyand/tsdb/buffer.go | 82 +++++++---
banyand/tsdb/metric.go | 25 ++--
dist/LICENSE | 2 +-
go.mod | 2 +-
go.sum | 4 +-
pkg/index/inverted/inverted.go | 3 +
pkg/pb/v1/write.go | 2 +-
pkg/run/closer.go | 17 +++
pkg/schema/metadata.go | 3 +-
test/cases/measure/measure.go | 2 +-
.../ebpf/oncpu/expected/profiling-task-list.yml | 3 +
.../trace/expected/profile-segment-list.yml | 37 ++++-
.../cases/profiling/trace/profiling-cases.yaml | 24 +--
.../cases/storage/expected/dependency-instance.yml | 2 +-
.../expected/dependency-services-consumer.yml | 9 +-
.../expected/dependency-services-provider.yml | 9 +-
.../expected/metrics-has-value-percentile.yml | 40 ++++-
.../cases/storage/expected/metrics-has-value.yml | 8 +-
.../metrics-nullable-single-sla-empty.yml} | 27 +---
...s-value.yml => metrics-nullable-single-sla.yml} | 6 +-
...etrics-has-value.yml => metrics-single-sla.yml} | 5 +-
test/e2e-v2/script/env | 8 +-
31 files changed, 354 insertions(+), 175 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 17c09ea6..fc47300b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,7 +9,7 @@ Release Notes.
- Add TSDB concept document.
- [UI] Add YAML editor for inputting query criteria.
- Refactor TopN to support `NULL` group while keeping seriesID from the source
measure.
-- Add a sharded buffer to TSDB.
+- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only
provides SST.
### Chores
diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index 72cdbbcd..3b5af6ba 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -27,7 +27,7 @@ EXPOSE 2345
ENTRYPOINT ["air"]
-FROM golang:1.19 AS base
+FROM golang:1.20 AS base
ENV GOPATH "/go"
ENV GO111MODULE "on"
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/banyandb"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
dbOpts badger.Options
}
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+ return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
Reader
}
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
- // Put a value with a timestamp/version
- Put(key, val []byte, ts uint64) error
- // PutAsync a value with a timestamp/version asynchronously.
- // Injected "f" func will notice the result of value write.
- PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
// TimeSeriesReader allows retrieving data from a time-series storage.
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
type TimeSeriesStore interface {
observability.Observable
io.Closer
- TimeSeriesWriter
+ Handover(skl *skl.Skiplist) error
TimeSeriesReader
}
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options
...TimeSeriesOptions) (TimeSeriesS
if btss.dbOpts.MemTableSize < 8<<20 {
btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
}
+ btss.dbOpts = btss.dbOpts.WithInTable()
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index b3615741..e2a9f54c 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -185,9 +185,9 @@ func encodeFieldValue(fieldValue *modelv1.FieldValue)
[]byte {
case *modelv1.FieldValue_Float:
return convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
case *modelv1.FieldValue_Str:
- return []byte(fieldValue.GetStr().Value)
+ return []byte(fieldValue.GetStr().GetValue())
case *modelv1.FieldValue_BinaryData:
- return fieldValue.GetBinaryData()
+ return bytes.Clone(fieldValue.GetBinaryData())
}
return nil
}
diff --git a/banyand/metadata/schema/checker_test.go
b/banyand/metadata/schema/checker_test.go
index 2280d88c..d15f814d 100644
--- a/banyand/metadata/schema/checker_test.go
+++ b/banyand/metadata/schema/checker_test.go
@@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Utils", func() {
})
ginkgo.AfterEach(func() {
-
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})
ginkgo.It("should be equal if nothing changed", func() {
@@ -188,7 +188,7 @@ var _ = ginkgo.Describe("Utils", func() {
})
ginkgo.AfterEach(func() {
-
Eventually(gleak.Goroutines()).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
})
ginkgo.It("should be equal if nothing changed", func() {
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index 00e32eaa..f220c281 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -27,6 +27,8 @@ import (
var (
// ErrGRPCResourceNotFound indicates the resource doesn't exist.
ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
+ // ErrClosed indicates the registry is closed.
+ ErrClosed = errors.New("metadata registry is closed")
statusGRPCInvalidArgument = status.New(codes.InvalidArgument,
"banyandb: input is invalid")
statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb:
resource not found")
@@ -34,8 +36,6 @@ var (
errGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
statusDataLoss = status.New(codes.DataLoss, "banyandb:
resource corrupts.")
errGRPCDataLoss = statusDataLoss.Err()
-
- errClosed = errors.New("metadata registry is closed")
)
// BadRequest creates a gRPC error with error details with type BadRequest,
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 91b0b243..d87512de 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -200,7 +200,7 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message
proto.Message) error {
if !e.closer.AddRunning() {
- return errClosed
+ return ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, key)
@@ -229,7 +229,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key
string, message proto.
// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata)
error {
if !e.closer.AddRunning() {
- return errClosed
+ return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
@@ -281,7 +281,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context,
metadata Metadata) erro
// Otherwise, it will return ErrGRPCAlreadyExists.
func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata)
error {
if !e.closer.AddRunning() {
- return errClosed
+ return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
@@ -314,7 +314,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context,
metadata Metadata) erro
func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix
string, factory func() proto.Message) ([]proto.Message, error) {
if !e.closer.AddRunning() {
- return nil, errClosed
+ return nil, ErrClosed
}
defer e.closer.Done()
resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(),
clientv3.WithRange(incrementLastByte(prefix)))
@@ -343,7 +343,7 @@ func listPrefixesForEntity(group, entityPrefix string)
string {
func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata)
(bool, error) {
if !e.closer.AddRunning() {
- return false, errClosed
+ return false, ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..1357e458 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
"sync/atomic"
"time"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"go.uber.org/multierr"
@@ -48,40 +49,46 @@ const (
componentSecondInvertedIdx = "inverted"
componentSecondLSMIdx = "lsm"
- defaultMainMemorySize = 8 << 20
- defaultEnqueueTimeout = 500 * time.Millisecond
+ defaultBufferSize = 8 << 20
+ defaultEnqueueTimeout = 500 * time.Millisecond
+ maxBlockAge = time.Hour
+ defaultWriteConcurrency = 1000
+ defaultNumBufferShards = 2
)
var errBlockClosingInterrupted = errors.New("interrupt to close the block")
type block struct {
- store kv.TimeSeriesStore
- openOpts openOpts
- queue bucket.Queue
- bucket.Reporter
- clock timestamp.Clock
- lsmIndex index.Store
invertedIndex index.Store
- closed *atomic.Bool
- l *logger.Logger
- deleted *atomic.Bool
- ref *atomic.Int32
- position common.Position
+ sst kv.TimeSeriesStore
+ queue bucket.Queue
+ bucket.Reporter
+ clock timestamp.Clock
+ lsmIndex index.Store
+ closed *atomic.Bool
+ buffer *Buffer
+ l *logger.Logger
+ deleted *atomic.Bool
+ ref *atomic.Int32
+ closeBufferTimer *time.Timer
+ position common.Position
timestamp.TimeRange
segSuffix string
suffix string
path string
closableLst []io.Closer
+ openOpts openOpts
lock sync.RWMutex
segID SectionID
blockID SectionID
}
type openOpts struct {
- store []kv.TimeSeriesOptions
- storePath string
- inverted inverted.StoreOpts
- lsm lsm.StoreOpts
+ storePath string
+ inverted inverted.StoreOpts
+ lsm lsm.StoreOpts
+ store []kv.TimeSeriesOptions
+ bufferSize int64
}
type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
if options.CompressionMethod.Type == CompressionTypeZSTD {
b.openOpts.store = append(b.openOpts.store,
kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
}
- var memSize int64
+ var bufferSize int64
if options.BlockMemSize < 1 {
- memSize = defaultMainMemorySize
+ bufferSize = defaultBufferSize
} else {
- memSize = options.BlockMemSize
+ bufferSize = options.BlockMemSize
}
- b.openOpts.store = append(b.openOpts.store,
kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+ b.openOpts.bufferSize = bufferSize
+ b.openOpts.store = append(b.openOpts.store,
kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
b.openOpts.storePath = path.Join(b.path, componentMain)
b.openOpts.inverted = inverted.StoreOpts{
Path: path.Join(b.path, componentSecondInvertedIdx),
Logger: b.l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
- lsmMemSize := memSize / 8
+ lsmMemSize := bufferSize / 8
if lsmMemSize < defaultKVMemorySize {
lsmMemSize = defaultKVMemorySize
}
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
}
func (b *block) open() (err error) {
- if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath,
b.openOpts.store...); err != nil {
+ if b.isActive() {
+ if err = b.openBuffer(); err != nil {
+ return err
+ }
+ }
+ if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath,
b.openOpts.store...); err != nil {
return err
}
- b.closableLst = append(b.closableLst, b.store)
+ b.closableLst = append(b.closableLst, b.sst)
if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err
!= nil {
return err
}
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
return nil
}
+func (b *block) openBuffer() (err error) {
+ if b.buffer != nil {
+ return nil
+ }
+ if b.buffer, err = NewBuffer(b.l,
int(b.openOpts.bufferSize/defaultNumBufferShards),
+ defaultWriteConcurrency, defaultNumBufferShards, b.flush); err
!= nil {
+ return err
+ }
+ now := b.clock.Now()
+ max := b.maxTime()
+ closeAfter := max.Sub(now)
+ // TODO: we should move inactive write buffer to segment or shard.
+ if now.After(max) {
+ closeAfter = maxBlockAge
+ }
+ // create a timer to close buffer
+ b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+ if b.l.Debug().Enabled() {
+ b.l.Debug().Msg("closing buffer")
+ }
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ if b.buffer == nil {
+ return
+ }
+ if err := b.buffer.Close(); err != nil {
+ b.l.Error().Err(err).Msg("close buffer error")
+ }
+ b.buffer = nil
+ })
+ return nil
+}
+
+func (b *block) isActive() bool {
+ return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+ return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+ b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+ return b.sst.Handover(skl)
+}
+
func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
if b.deleted.Load() {
return nil, errors.WithMessagef(errBlockAbsent, "block %s is
deleted", b)
@@ -225,29 +284,15 @@ func (b *block) delegate(ctx context.Context)
(blockDelegate, error) {
}
func (b *block) incRef() bool {
-loop:
if b.Closed() {
return false
}
- r := b.ref.Load()
- if b.ref.CompareAndSwap(r, r+1) {
- return true
- }
- runtime.Gosched()
- goto loop
+ b.ref.Add(1)
+ return true
}
func (b *block) Done() {
-loop:
- r := b.ref.Load()
- if r < 1 {
- return
- }
- if b.ref.CompareAndSwap(r, r-1) {
- return
- }
- runtime.Gosched()
- goto loop
+ b.ref.Add(-1)
}
func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
@@ -284,6 +329,12 @@ func (b *block) close(ctx context.Context) (err error) {
case <-ch:
}
b.closed.Store(true)
+ if b.closeBufferTimer != nil {
+ b.closeBufferTimer.Stop()
+ }
+ if b.buffer != nil {
+ err = multierr.Append(err, b.buffer.Close())
+ }
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
@@ -308,15 +359,27 @@ func (b *block) String() string {
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
- names = append(names, componentMain, componentSecondInvertedIdx,
componentSecondLSMIdx)
if b.Closed() {
- stats = make([]observability.Statistics, 3)
+ stats = make([]observability.Statistics, 0)
return
}
- stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(),
b.lsmIndex.Stats())
+ bnn, bss := b.buffer.Stats()
+ if bnn != nil {
+ names = append(names, bnn...)
+ stats = append(stats, bss...)
+ }
+ names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+ stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
return names, stats
}
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+ if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+ return v, nil
+ }
+ return b.sst.Get(key, ts)
+}
+
type blockDelegate interface {
io.Closer
contains(ts time.Time) bool
@@ -340,7 +403,7 @@ type bDelegate struct {
}
func (d *bDelegate) dataReader() kv.TimeSeriesReader {
- return d.delegate.store
+ return d.delegate
}
func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +427,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID
SectionID) {
}
func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
- return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+ // On-demand open buffer
+ if d.delegate.buffer == nil {
+ d.delegate.lock.Lock()
+ if err := d.delegate.openBuffer(); err != nil {
+ d.delegate.lock.Unlock()
+ return err
+ }
+ d.delegate.lock.Unlock()
+ }
+ d.delegate.buffer.Write(key, val, ts)
+ return nil
}
func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID)
error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..75ba8e96 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -21,16 +21,21 @@ import (
"fmt"
"sync"
"time"
+ "unsafe"
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
-const defaultSize = 1 << 20 // 1MB
+const (
+ defaultSize = 1 << 20 // 1MB
+ nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
+)
type operation struct {
key []byte
@@ -53,8 +58,7 @@ type bufferShardBucket struct {
log *logger.Logger
immutables []*skl.Skiplist
index int
- flushSize int
- size int
+ capacity int
mutex sync.RWMutex
}
@@ -72,10 +76,6 @@ type Buffer struct {
// NewBuffer creates a new Buffer instance with the given parameters.
func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int,
onFlushFn onFlush) (*Buffer, error) {
- size := flushSize
- if size < defaultSize {
- size = defaultSize
- }
buckets := make([]bufferShardBucket, numShards)
buffer := &Buffer{
buckets: buckets,
@@ -89,9 +89,8 @@ func NewBuffer(log *logger.Logger, flushSize,
writeConcurrency, numShards int, o
for i := 0; i < numShards; i++ {
buckets[i] = bufferShardBucket{
index: i,
- size: size,
- mutable: skl.NewSkiplist(int64(size)),
- flushSize: flushSize,
+ capacity: flushSize,
+ mutable: skl.NewSkiplist(int64(flushSize)),
writeCh: make(chan operation, writeConcurrency),
flushCh: make(chan flushEvent, 1),
writeWaitGroup: &buffer.writeWaitGroup,
@@ -105,7 +104,7 @@ func NewBuffer(log *logger.Logger, flushSize,
writeConcurrency, numShards int, o
// Write adds a key-value pair with a timestamp to the appropriate shard
bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
- if !b.entryCloser.AddRunning() {
+ if b == nil || !b.entryCloser.AddRunning() {
return
}
defer b.entryCloser.Done()
@@ -119,10 +118,16 @@ func (b *Buffer) Write(key, value []byte, timestamp
time.Time) {
// Read retrieves the value associated with the given key and timestamp from
the appropriate shard bucket in the buffer.
func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+ if b == nil || !b.entryCloser.AddRunning() {
+ return nil, false
+ }
+ defer b.entryCloser.Done()
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
- for _, bk := range b.buckets[index].getAll() {
+ ll, deferFn := b.buckets[index].getAll()
+ defer deferFn()
+ for _, bk := range ll {
value := bk.Get(keyWithTS)
if value.Meta == 0 && value.Value == nil {
continue
@@ -157,11 +162,41 @@ func (b *Buffer) Close() error {
return nil
}
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+ if b == nil || !b.entryCloser.AddRunning() {
+ return nil, nil
+ }
+ names := make([]string, b.numShards)
+ stats := make([]observability.Statistics, b.numShards)
+ size := func(bucket *bufferShardBucket) (size int64, maxSize int64) {
+ ll, deferFn := bucket.getAll()
+ defer deferFn()
+ for _, l := range ll {
+ if l == nil {
+ continue
+ }
+ size += l.MemSize()
+ maxSize += int64(bucket.capacity)
+ }
+ return
+ }
+ for i := 0; i < b.numShards; i++ {
+ names[i] = fmt.Sprintf("buffer-%d", i)
+ size, maxSize := size(&b.buckets[i])
+ stats[i] = observability.Statistics{
+ MemBytes: size,
+ MaxMemBytes: maxSize,
+ }
+ }
+ return names, stats
+}
+
func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}
-func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
+func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
bsb.mutex.RLock()
defer bsb.mutex.RUnlock()
allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
@@ -172,7 +207,11 @@ func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
allList[i+1] = bsb.immutables[last-i]
bsb.immutables[last-i].IncrRef()
}
- return allList
+ return allList, func() {
+ for _, l := range allList {
+ l.DecrRef()
+ }
+ }
}
func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
@@ -192,22 +231,27 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
}()
go func() {
defer bsb.writeWaitGroup.Done()
+ volume := 0
for op := range bsb.writeCh {
- bsb.mutex.Lock()
- if bsb.mutable.MemSize() >= int64(bsb.flushSize) {
+ k := y.KeyWithTs(op.key, op.epoch)
+ v := y.ValueStruct{Value: op.value}
+ volume += len(k) + int(v.EncodedSize()) +
skl.MaxNodeSize + nodeAlign
+ if volume >= bsb.capacity || bsb.mutable.MemSize() >=
int64(bsb.capacity) {
select {
case bsb.flushCh <- flushEvent{data:
bsb.mutable}:
default:
}
+ volume = 0
+ bsb.mutex.Lock()
bsb.swap()
+ bsb.mutex.Unlock()
}
- bsb.mutex.Unlock()
- bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch),
y.ValueStruct{Value: op.value, Version: op.epoch})
+ bsb.mutable.Put(k, v)
}
}()
}
func (bsb *bufferShardBucket) swap() {
bsb.immutables = append(bsb.immutables, bsb.mutable)
- bsb.mutable = skl.NewSkiplist(int64(bsb.size))
+ bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index 40340b53..bdd0d91f 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -18,8 +18,10 @@
package tsdb
import (
+ "fmt"
"time"
+ "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -50,17 +52,22 @@ func init() {
)
}
-func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
+func (s *shard) stat(_ time.Time, _ *logger.Logger) (r bool) {
+ r = true
defer func() {
if r := recover(); r != nil {
- s.l.Warn().Interface("r", r).Msg("recovered")
+ err, ok := r.(error)
+ if !ok {
+ err = fmt.Errorf("%v", r)
+ }
+ s.l.Warn().Err(errors.WithStack(err)).Msg("recovered")
}
}()
seriesStat := s.seriesDatabase.Stats()
s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
segStats := observability.Statistics{}
- blockStats := newBlockStat()
+ blockStats := make(map[string]observability.Statistics)
for _, seg := range s.segmentController.segments() {
segStat := seg.Stats()
segStats.MaxMemBytes += segStat.MaxMemBytes
@@ -75,6 +82,8 @@ func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
if ok {
bsc.MaxMemBytes += bs.MaxMemBytes
bsc.MemBytes += bs.MemBytes
+ } else {
+ blockStats[names[i]] = bs
}
}
}
@@ -85,7 +94,7 @@ func (s *shard) stat(_ time.Time, _ *logger.Logger) bool {
s.curry(mtBytes).WithLabelValues(name).Set(float64(bs.MemBytes))
s.curry(maxMtBytes).WithLabelValues(name).Set(float64(bs.MaxMemBytes))
}
- return true
+ return
}
func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec {
@@ -95,11 +104,3 @@ func (s *shard) curry(gv *prometheus.GaugeVec)
*prometheus.GaugeVec {
"shard": s.position.Shard,
})
}
-
-func newBlockStat() map[string]*observability.Statistics {
- return map[string]*observability.Statistics{
- componentMain: {},
- componentSecondInvertedIdx: {},
- componentSecondLSMIdx: {},
- }
-}
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
Apache-2.0 licenses
========================================================================
- github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+ github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
github.com/blevesearch/segment v0.9.0 Apache-2.0
github.com/blevesearch/vellum v1.0.7 Apache-2.0
github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
github.com/blugelabs/bluge_segment_api =>
github.com/zinclabs/bluge_segment_api v1.0.0
github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
- github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3
v3.0.0-20221227124922-b88a2f7d336f
+ github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3
v3.0.0-20230329010346-dfdf57f0581b
)
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2
h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.9.4
h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f
h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b
h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod
h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod
h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 3020f3c4..ff4cd4d4 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -165,6 +165,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts, ord
if err != nil {
return nil, err
}
+ defer reader.Close()
fk := fieldKey.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
@@ -210,6 +211,7 @@ func (s *store) MatchTerms(field index.Field) (list
posting.List, err error) {
if err != nil {
return nil, err
}
+ defer reader.Close()
fk := field.Key.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
@@ -242,6 +244,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches
[]string) (posting.List,
if err != nil {
return nil, err
}
+ defer reader.Close()
analyzer := analyzers[fieldKey.Analyzer]
fk := fieldKey.MarshalIndexRule()
query := bluge.NewBooleanQuery()
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 600df45c..5ff2bb09 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -148,7 +148,7 @@ func ParseTagValue(tagValue *modelv1.TagValue) (TagValue,
error) {
}
return *fv, nil
case *modelv1.TagValue_BinaryData:
- return newValue(x.BinaryData), nil
+ return newValue(bytes.Clone(x.BinaryData)), nil
case *modelv1.TagValue_Id:
return newValue([]byte(x.Id.GetValue())), nil
}
diff --git a/pkg/run/closer.go b/pkg/run/closer.go
index 41d42053..5f718ee4 100644
--- a/pkg/run/closer.go
+++ b/pkg/run/closer.go
@@ -22,6 +22,8 @@ import (
"sync"
)
+var dummyCloserChan <-chan struct{}
+
// Closer can close a goroutine then wait for it to stop.
type Closer struct {
ctx context.Context
@@ -41,6 +43,9 @@ func NewCloser(initial int) *Closer {
// AddRunning adds a running task.
func (c *Closer) AddRunning() bool {
+ if c == nil {
+ return false
+ }
c.lock.RLock()
defer c.lock.RUnlock()
if c.closed {
@@ -52,16 +57,25 @@ func (c *Closer) AddRunning() bool {
// CloseNotify receives a signal from Close.
func (c *Closer) CloseNotify() <-chan struct{} {
+ if c == nil {
+ return dummyCloserChan
+ }
return c.ctx.Done()
}
// Done notifies that one task is done.
func (c *Closer) Done() {
+ if c == nil {
+ return
+ }
c.waiting.Done()
}
// CloseThenWait closes all tasks then waits till they are done.
func (c *Closer) CloseThenWait() {
+ if c == nil {
+ return
+ }
c.cancel()
c.lock.Lock()
c.closed = true
@@ -71,6 +85,9 @@ func (c *Closer) CloseThenWait() {
// Closed returns whether the Closer is closed.
func (c *Closer) Closed() bool {
+ if c == nil {
+ return true
+ }
c.lock.RLock()
defer c.lock.RUnlock()
return c.closed
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index c2f6a90c..103958d5 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -34,6 +34,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -196,7 +197,7 @@ func (sr *schemaRepo) Watcher() {
err =
sr.deleteResource(evt.Metadata)
}
}
- if err != nil {
+ if err != nil && !errors.Is(err,
schema.ErrClosed) {
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event.
retry...")
select {
case sr.eventCh <- evt:
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
}
)
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute,
Offset: -20 * time.Minute}),
g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
diff --git
a/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
b/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
index faa04bc5..f357fd6f 100644
--- a/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
+++ b/test/e2e-v2/cases/profiling/ebpf/oncpu/expected/profiling-task-list.yml
@@ -19,6 +19,8 @@
servicename: sqrt
serviceinstanceid: null
serviceinstancename: null
+ processid: null
+ processname: null
processlabels:
{{- contains .processlabels }}
- e2e-label1
@@ -29,4 +31,5 @@
fixedtriggerduration: 60
targettype: ON_CPU
createtime: {{ gt .createtime 0 }}
+ continuousprofilingcauses: []
{{- end }}
\ No newline at end of file
diff --git
a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
b/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
index a9230468..5c45b8e7 100644
--- a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
+++ b/test/e2e-v2/cases/profiling/trace/expected/profile-segment-list.yml
@@ -14,14 +14,39 @@
# limitations under the License.
{{- contains . }}
-- segmentid: {{ notEmpty .segmentid }}
+- traceid: {{ notEmpty .traceid }}
+ instanceid: {{ notEmpty .instanceid }}
+ instancename: provider1
endpointnames:
- POST:/profile/{name}
duration: {{ gt .duration 0 }}
start: "{{ notEmpty .start }}"
- iserror: false
- traceids:
- {{- contains .traceids }}
- - {{ notEmpty . }}
- {{- end }}
+ spans:
+ {{- contains .spans}}
+ - spanid: {{ ge .spanid 0 }}
+ parentspanid: {{ .parentspanid }}
+ segmentid: {{ notEmpty .segmentid }}
+ refs: []
+ servicecode: e2e-service-provider
+ serviceinstancename: provider1
+ starttime: {{ gt .starttime 0 }}
+ endtime: {{ gt .endtime 0 }}
+ endpointname: POST:/profile/{name}
+ type: Entry
+ peer: ""
+ component: SpringMVC
+ iserror: false
+ layer: Http
+ tags:
+ {{- contains .tags }}
+ - key: url
+ value: {{ notEmpty .value }}
+ - key: http.method
+ value: POST
+ - key: http.params
+ value: "e2e=[true]"
+ {{- end }}
+ logs: []
+ profiled: true
+ {{- end}}
{{- end }}
diff --git a/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
b/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
index 7a395c2a..69fc6557 100644
--- a/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
+++ b/test/e2e-v2/cases/profiling/trace/profiling-cases.yaml
@@ -55,22 +55,22 @@
swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e
'.[0].id' - \
)
expected: expected/profile-segment-list.yml
- # profiled segment detail
- - query: |
- swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace
profiled-segment --segment-id=$( \
- swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list
--task-id=$( \
- swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e
'.[0].id' - \
- ) | yq e '.[0].segmentid' - \
- )
- expected: expected/profile-segment-detail.yml
# query profiled segment analyze
- query: |
segmentid=$( \
swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list
--task-id=$( \
swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e
'.[0].id' - \
- ) | yq e '.[0].segmentid' - \
+ ) | yq e '.[0].spans.[] | select(.spanid == 0) | .segmentid' - \
+ );
+ start=$(
+ swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list
--task-id=$( \
+ swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e
'.[0].id' - \
+ ) | yq e '.[0].spans.[] | select(.spanid == 0) | .starttime' - \
+ );
+ end=$(
+ swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace segment-list
--task-id=$( \
+ swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace list
--service-name=e2e-service-provider --endpoint-name=POST:/profile/{name} | yq e
'.[0].id' - \
+ ) | yq e '.[0].spans.[] | select(.spanid == 0) | .endtime' - \
);
- start=$(swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace
profiled-segment --segment-id=$segmentid | yq e '.spans[] | select(.spanid ==
0).starttime' -);
- end=$(swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace
profiled-segment --segment-id=$segmentid | yq e '.spans[] | select(.spanid ==
0).endtime' -);
- swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace analysis
--segment-id=$segmentid --time-ranges=$(echo $start"-"$end)
+ swctl --display yaml
--base-url=http://${oap_host}:${oap_12800}/graphql profiling trace analysis
--segment-ids=$segmentid --time-ranges=$(echo $start"-"$end)
expected: expected/profile-segment-analyze.yml
diff --git a/test/e2e-v2/cases/storage/expected/dependency-instance.yml
b/test/e2e-v2/cases/storage/expected/dependency-instance.yml
index d3b9547c..61cc1cad 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-instance.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-instance.yml
@@ -25,7 +25,7 @@ nodes:
name: provider1
serviceid: {{ b64enc "e2e-service-provider" }}.1
servicename: e2e-service-provider
- type: "Tomcat"
+ type: ""
isreal: true
{{- end }}
calls:
diff --git
a/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
b/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
index 11ba084e..106faf57 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-services-consumer.yml
@@ -31,9 +31,11 @@ nodes:
calls:
{{- contains .calls }}
- source: {{ b64enc "e2e-service-consumer"}}.1
- sourcecomponents: []
+ sourcecomponents:
+ - SpringRestTemplate
target: {{ b64enc "e2e-service-provider"}}.1
- targetcomponents: []
+ targetcomponents:
+ - Tomcat
id: {{ b64enc "e2e-service-consumer"}}.1-{{ b64enc "e2e-service-provider"}}.1
detectpoints:
- CLIENT
@@ -41,7 +43,8 @@ calls:
- source: {{ b64enc "User" }}.0
sourcecomponents: []
target: {{ b64enc "e2e-service-consumer"}}.1
- targetcomponents: []
+ targetcomponents:
+ - Tomcat
id: {{ b64enc "User" }}.0-{{ b64enc "e2e-service-consumer"}}.1
detectpoints:
- SERVER
diff --git
a/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
b/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
index 600dd6dd..63646974 100644
--- a/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
+++ b/test/e2e-v2/cases/storage/expected/dependency-services-provider.yml
@@ -31,15 +31,18 @@ nodes:
calls:
{{- contains .calls }}
- source: {{ b64enc "e2e-service-consumer"}}.1
- sourcecomponents: []
+ sourcecomponents:
+ - SpringRestTemplate
target: {{ b64enc "e2e-service-provider"}}.1
- targetcomponents: []
+ targetcomponents:
+ - Tomcat
id: {{ b64enc "e2e-service-consumer"}}.1-{{ b64enc "e2e-service-provider"}}.1
detectpoints:
- CLIENT
- SERVER
- source: {{ b64enc "e2e-service-provider" }}.1
- sourcecomponents: []
+ sourcecomponents:
+ - h2-jdbc-driver
target: {{ b64enc "localhost:-1"}}.0
targetcomponents: []
id: {{ b64enc "e2e-service-provider" }}.1-{{ b64enc "localhost:-1"}}.0
diff --git
a/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
b/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
index ef174902..c360120b 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-has-value-percentile.yml
@@ -18,30 +18,60 @@
value:
{{- contains .value }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+ - key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
- key: 1
value:
{{- contains .value }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+ - key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
- key: 2
value:
{{- contains .value }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+ - key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
- key: 3
value:
{{- contains .value }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+ - key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
- key: 4
value:
{{- contains .value }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+ - key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
{{- end }}
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
b/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
index d9c49854..ceb5b007 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
@@ -15,5 +15,11 @@
{{- contains . }}
- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
+ value:
+ value: 0
+ isemptyvalue: true
+- key: {{ notEmpty .key }}
+ value:
+ value: {{ ge .value.value 1 }}
+ isemptyvalue: false
{{- end }}
\ No newline at end of file
diff --git
a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml
b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
similarity index 60%
rename from
test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml
rename to
test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
index 7b223635..25b00113 100644
--- a/test/e2e-v2/cases/profiling/trace/expected/profile-segment-detail.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla-empty.yml
@@ -13,28 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-spans:
-{{- contains .spans }}
-- spanid: 0
- parentspanid: -1
- servicecode: e2e-service-provider
- serviceinstancename: ""
- starttime: {{ gt .starttime 0 }}
- endtime: {{ gt .endtime 0 }}
- endpointname: POST:/profile/{name}
- type: Entry
- peer: ""
- component: SpringMVC
- iserror: false
- layer: Http
- tags:
- {{- contains .tags }}
- - key: url
- value: {{ notEmpty .value }}
- - key: http.method
- value: POST
- - key: http.params
- value: "e2e=[true]"
- {{- end }}
- logs: []
-{{- end }}
+value: 0
+isemptyvalue: true
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
similarity index 90%
copy from test/e2e-v2/cases/storage/expected/metrics-has-value.yml
copy to test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
index d9c49854..ee72d979 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-nullable-single-sla.yml
@@ -13,7 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{{- contains . }}
-- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
-{{- end }}
\ No newline at end of file
+value: 10000
+isemptyvalue: false
diff --git a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
b/test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
similarity index 90%
copy from test/e2e-v2/cases/storage/expected/metrics-has-value.yml
copy to test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
index d9c49854..dd1d3a40 100644
--- a/test/e2e-v2/cases/storage/expected/metrics-has-value.yml
+++ b/test/e2e-v2/cases/storage/expected/metrics-single-sla.yml
@@ -13,7 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{{- contains . }}
-- key: {{ notEmpty .key }}
- value: {{ ge .value 1 }}
-{{- end }}
\ No newline at end of file
+10000
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@
SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9