(skywalking-banyandb) 01/01: Add more test cases

2024-04-03 Thread hanahmily
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch idx-gc
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 016c3f59fd9057a6bde3ef5e20d4ed31d6b1872d
Author: Gao Hongtao 
AuthorDate: Thu Apr 4 03:32:08 2024 +

Add more test cases

Signed-off-by: Gao Hongtao 
---
 banyand/internal/storage/index.go  | 253 +++--
 banyand/internal/storage/index_test.go |  86 ++-
 banyand/internal/storage/retention.go  |  12 +-
 banyand/internal/storage/segment.go|  16 +--
 banyand/internal/storage/storage.go|  10 ++
 pkg/fs/file_system.go  |   7 +-
 pkg/fs/local_file_system.go|  17 +--
 pkg/fs/local_file_system_test.go   |   2 +-
 8 files changed, 255 insertions(+), 148 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index f19fc166..7bc91fea 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -20,8 +20,11 @@ package storage
 import (
"context"
"fmt"
-   "os"
+   "path"
"path/filepath"
+   "sort"
+   "strconv"
+   "strings"
"sync"
"time"
 
@@ -47,19 +50,21 @@ func (d *database[T, O]) Lookup(ctx context.Context, series 
*pbv1.Series) (pbv1.
 }
 
 type seriesIndex struct {
-   l *logger.Logger
-   store index.SeriesStore
-   name  string
+   startTime time.Time
+   store index.SeriesStore
+   l *logger.Logger
+   path  string
 }
 
-func newSeriesIndex(ctx context.Context, path, name string, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, path string, startTime time.Time, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
si := &seriesIndex{
-   name: name,
-   l:logger.Fetch(ctx, "series_index"),
+   path:  path,
+   startTime: startTime,
+   l: logger.Fetch(ctx, "series_index"),
}
var err error
if si.store, err = inverted.NewStore(inverted.StoreOpts{
-   Path: filepath.Join(path, name),
+   Path: path,
Logger:   si.l,
BatchWaitSec: flushTimeoutSeconds,
}); err != nil {
@@ -233,39 +238,67 @@ func (s *seriesIndex) Close() error {
 }
 
 type seriesIndexController[T TSTable, O any] struct {
-   ctx context.Context
-   hot *seriesIndex
-   standby *seriesIndex
-   timestamp.TimeRange
-   l*logger.Logger
-   opts TSDBOpts[T, O]
+   clock   timestamp.Clock
+   hot *seriesIndex
+   standby *seriesIndex
+   l   *logger.Logger
+   locationstring
+   optsTSDBOpts[T, O]
+   standbyLiveTime time.Duration
sync.RWMutex
 }
 
-func standard(t time.Time, unit IntervalUnit) time.Time {
-   switch unit {
-   case HOUR:
-   return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
-   case DAY:
-   return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
-   }
-   panic("invalid interval unit")
-}
-
 func newSeriesIndexController[T TSTable, O any](
ctx context.Context,
opts TSDBOpts[T, O],
 ) (*seriesIndexController[T, O], error) {
-   var hpath, spath string
l := logger.Fetch(ctx, "seriesIndexController")
-   startTime := standard(time.Now(), opts.TTL.Unit)
-   endTime := startTime.Add(opts.TTL.estimatedDuration())
-   timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
-   location := filepath.Clean(opts.Location)
+   clock, ctx := timestamp.GetClock(ctx)
+   var standbyLiveTime time.Duration
+   switch opts.TTL.Unit {
+   case HOUR:
+   standbyLiveTime = time.Hour
+   case DAY:
+   standbyLiveTime = 24 * time.Hour
+   default:
+   }
+   sic := &seriesIndexController[T, O]{
+   opts:opts,
+   clock:   clock,
+   standbyLiveTime: standbyLiveTime,
+   location:filepath.Clean(opts.Location),
+   l:   l,
+   }
+   idxName, err := sic.loadIdx()
+   if err != nil {
+   return nil, err
+   }
+   switch len(idxName) {
+   case 0:
+   if sic.hot, err = sic.newIdx(ctx); err != nil {
+   return nil, err
+   }
+   case 1:
+   if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+   return nil, err
+   }
+   case 2:
+   if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+   return nil, err
+   }
+   if sic.standby, err = sic.openIdx(ct

(skywalking-banyandb) 01/01: Add more test cases

2024-04-03 Thread hanahmily
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace-test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3ae549e23b60d94fdf3f62c6203dec8dbfbbde8b
Author: Gao Hongtao 
AuthorDate: Wed Apr 3 07:28:17 2024 +

Add more test cases

Signed-off-by: Gao Hongtao 
---
 banyand/measure/block.go  |  5 ++-
 banyand/measure/block_metadata.go | 12 +-
 banyand/stream/block.go   |  5 ++-
 banyand/stream/block_metadata.go  | 11 -
 pkg/test/query/trace.go   | 79 +--
 test/stress/trace/trace_suite_test.go |  8 
 6 files changed, 110 insertions(+), 10 deletions(-)

diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index cf3c2eab..b4792d85 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -610,10 +610,13 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
}
bc.bm.field.columnMetadata = cfm
bc.bm.tagProjection = bc.tagProjection
-   tf := make(map[string]*dataBlock, len(bc.tagProjection))
+   var tf map[string]*dataBlock
for i := range bc.tagProjection {
for tfName, block := range bc.bm.tagFamilies {
if bc.tagProjection[i].Family == tfName {
+   if tf == nil {
+   tf = make(map[string]*dataBlock, 
len(bc.tagProjection))
+   }
tf[tfName] = block
}
}
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index 82e44ca1..6a066329 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -176,7 +176,7 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal 
tagFamily dataBlock: %w", err)
}
-   bm.tagFamilies[convert.BytesToString(nameBytes)] = tf
+   bm.tagFamilies[string(nameBytes)] = tf
}
}
src, err = bm.field.unmarshal(src)
@@ -212,6 +212,13 @@ type blockMetadataArray struct {
arr []blockMetadata
 }
 
+func (bma *blockMetadataArray) reset() {
+   for i := range bma.arr {
+   bma.arr[i].reset()
+   }
+   bma.arr = bma.arr[:0]
+}
+
 var blockMetadataArrayPool sync.Pool
 
 func generateBlockMetadataArray() *blockMetadataArray {
@@ -223,7 +230,7 @@ func generateBlockMetadataArray() *blockMetadataArray {
 }
 
 func releaseBlockMetadataArray(bma *blockMetadataArray) {
-   bma.arr = bma.arr[:0]
+   bma.reset()
blockMetadataArrayPool.Put(bma)
 }
 
@@ -279,6 +286,7 @@ func unmarshalBlockMetadata(dst []blockMetadata, src 
[]byte) ([]blockMetadata, e
dst = append(dst, blockMetadata{})
}
bm := &dst[len(dst)-1]
+   bm.reset()
tail, err := bm.unmarshal(src)
if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal 
blockMetadata entries: %w", err)
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index e8f63200..63342b11 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -529,10 +529,13 @@ func (bc *blockCursor) copyTo(r *pbv1.StreamResult) {
 func (bc *blockCursor) loadData(tmpBlock *block) bool {
tmpBlock.reset()
bc.bm.tagProjection = bc.tagProjection
-   tf := make(map[string]*dataBlock, len(bc.tagProjection))
+   var tf map[string]*dataBlock
for i := range bc.tagProjection {
for tfName, block := range bc.bm.tagFamilies {
if bc.tagProjection[i].Family == tfName {
+   if tf == nil {
+   tf = make(map[string]*dataBlock, 
len(bc.tagProjection))
+   }
tf[tfName] = block
}
}
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 8c8e2771..3906f603 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -181,7 +181,7 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte, 
error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal 
tagFamily dataBlock: %w", err)
}
-   bm.tagFamilies[convert.BytesToString(nameBytes)] = tf
+   bm.tagFamilies[string(nameBytes)] = tf
}
}
if err != nil {
@@ -216,6 +216,13 @@ type blockMetadataArray struct {
arr []blockMetadata
 }
 
+func (bma *blockMetadataArray) reset() {
+   for i := range bma.a