(skywalking-banyandb) 01/01: Add more test cases
This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch idx-gc in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git commit 016c3f59fd9057a6bde3ef5e20d4ed31d6b1872d Author: Gao Hongtao AuthorDate: Thu Apr 4 03:32:08 2024 + Add more test cases Signed-off-by: Gao Hongtao --- banyand/internal/storage/index.go | 253 +++-- banyand/internal/storage/index_test.go | 86 ++- banyand/internal/storage/retention.go | 12 +- banyand/internal/storage/segment.go| 16 +-- banyand/internal/storage/storage.go| 10 ++ pkg/fs/file_system.go | 7 +- pkg/fs/local_file_system.go| 17 +-- pkg/fs/local_file_system_test.go | 2 +- 8 files changed, 255 insertions(+), 148 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index f19fc166..7bc91fea 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -20,8 +20,11 @@ package storage import ( "context" "fmt" - "os" + "path" "path/filepath" + "sort" + "strconv" + "strings" "sync" "time" @@ -47,19 +50,21 @@ func (d *database[T, O]) Lookup(ctx context.Context, series *pbv1.Series) (pbv1. } type seriesIndex struct { - l *logger.Logger - store index.SeriesStore - name string + startTime time.Time + store index.SeriesStore + l *logger.Logger + path string } -func newSeriesIndex(ctx context.Context, path, name string, flushTimeoutSeconds int64) (*seriesIndex, error) { +func newSeriesIndex(ctx context.Context, path string, startTime time.Time, flushTimeoutSeconds int64) (*seriesIndex, error) { si := &seriesIndex{ - name: name, - l:logger.Fetch(ctx, "series_index"), + path: path, + startTime: startTime, + l: logger.Fetch(ctx, "series_index"), } var err error if si.store, err = inverted.NewStore(inverted.StoreOpts{ - Path: filepath.Join(path, name), + Path: path, Logger: si.l, BatchWaitSec: flushTimeoutSeconds, }); err != nil { @@ -233,39 +238,67 @@ func (s *seriesIndex) Close() error { } type seriesIndexController[T TSTable, O any] struct { - ctx context.Context - hot *seriesIndex - standby *seriesIndex - timestamp.TimeRange - l*logger.Logger - opts TSDBOpts[T, O] + clock timestamp.Clock + hot *seriesIndex + standby *seriesIndex + l *logger.Logger + locationstring + optsTSDBOpts[T, O] + standbyLiveTime time.Duration sync.RWMutex } -func standard(t time.Time, unit IntervalUnit) time.Time { - switch unit { - case HOUR: - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) - case DAY: - return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) - } - panic("invalid interval unit") -} - func newSeriesIndexController[T TSTable, O any]( ctx context.Context, opts TSDBOpts[T, O], ) (*seriesIndexController[T, O], error) { - var hpath, spath string l := logger.Fetch(ctx, "seriesIndexController") - startTime := standard(time.Now(), opts.TTL.Unit) - endTime := startTime.Add(opts.TTL.estimatedDuration()) - timeRange := timestamp.NewSectionTimeRange(startTime, endTime) - location := filepath.Clean(opts.Location) + clock, ctx := timestamp.GetClock(ctx) + var standbyLiveTime time.Duration + switch opts.TTL.Unit { + case HOUR: + standbyLiveTime = time.Hour + case DAY: + standbyLiveTime = 24 * time.Hour + default: + } + sic := &seriesIndexController[T, O]{ + opts:opts, + clock: clock, + standbyLiveTime: standbyLiveTime, + location:filepath.Clean(opts.Location), + l: l, + } + idxName, err := sic.loadIdx() + if err != nil { + return nil, err + } + switch len(idxName) { + case 0: + if sic.hot, err = sic.newIdx(ctx); err != nil { + return nil, err + } + case 1: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + case 2: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + if sic.standby, err = sic.openIdx(ct
(skywalking-banyandb) 01/01: Add more test cases
This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace-test in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git commit 3ae549e23b60d94fdf3f62c6203dec8dbfbbde8b Author: Gao Hongtao AuthorDate: Wed Apr 3 07:28:17 2024 + Add more test cases Signed-off-by: Gao Hongtao --- banyand/measure/block.go | 5 ++- banyand/measure/block_metadata.go | 12 +- banyand/stream/block.go | 5 ++- banyand/stream/block_metadata.go | 11 - pkg/test/query/trace.go | 79 +-- test/stress/trace/trace_suite_test.go | 8 6 files changed, 110 insertions(+), 10 deletions(-) diff --git a/banyand/measure/block.go b/banyand/measure/block.go index cf3c2eab..b4792d85 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -610,10 +610,13 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { } bc.bm.field.columnMetadata = cfm bc.bm.tagProjection = bc.tagProjection - tf := make(map[string]*dataBlock, len(bc.tagProjection)) + var tf map[string]*dataBlock for i := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { if bc.tagProjection[i].Family == tfName { + if tf == nil { + tf = make(map[string]*dataBlock, len(bc.tagProjection)) + } tf[tfName] = block } } diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index 82e44ca1..6a066329 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -176,7 +176,7 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err) } - bm.tagFamilies[convert.BytesToString(nameBytes)] = tf + bm.tagFamilies[string(nameBytes)] = tf } } src, err = bm.field.unmarshal(src) @@ -212,6 +212,13 @@ type blockMetadataArray struct { arr []blockMetadata } +func (bma *blockMetadataArray) reset() { + for i := range bma.arr { + bma.arr[i].reset() + } + bma.arr = bma.arr[:0] +} + var blockMetadataArrayPool sync.Pool func generateBlockMetadataArray() *blockMetadataArray { @@ -223,7 +230,7 @@ func generateBlockMetadataArray() *blockMetadataArray { } func releaseBlockMetadataArray(bma *blockMetadataArray) { - bma.arr = bma.arr[:0] + bma.reset() blockMetadataArrayPool.Put(bma) } @@ -279,6 +286,7 @@ func unmarshalBlockMetadata(dst []blockMetadata, src []byte) ([]blockMetadata, e dst = append(dst, blockMetadata{}) } bm := &dst[len(dst)-1] + bm.reset() tail, err := bm.unmarshal(src) if err != nil { return dstOrig, fmt.Errorf("cannot unmarshal blockMetadata entries: %w", err) diff --git a/banyand/stream/block.go b/banyand/stream/block.go index e8f63200..63342b11 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -529,10 +529,13 @@ func (bc *blockCursor) copyTo(r *pbv1.StreamResult) { func (bc *blockCursor) loadData(tmpBlock *block) bool { tmpBlock.reset() bc.bm.tagProjection = bc.tagProjection - tf := make(map[string]*dataBlock, len(bc.tagProjection)) + var tf map[string]*dataBlock for i := range bc.tagProjection { for tfName, block := range bc.bm.tagFamilies { if bc.tagProjection[i].Family == tfName { + if tf == nil { + tf = make(map[string]*dataBlock, len(bc.tagProjection)) + } tf[tfName] = block } } diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go index 8c8e2771..3906f603 100644 --- a/banyand/stream/block_metadata.go +++ b/banyand/stream/block_metadata.go @@ -181,7 +181,7 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal tagFamily dataBlock: %w", err) } - bm.tagFamilies[convert.BytesToString(nameBytes)] = tf + bm.tagFamilies[string(nameBytes)] = tf } } if err != nil { @@ -216,6 +216,13 @@ type blockMetadataArray struct { arr []blockMetadata } +func (bma *blockMetadataArray) reset() { + for i := range bma.a