This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch tsdb-load in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 24eb7f01417ace87ad6f5d2fefcc25594024fc31 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Jan 6 03:43:07 2022 +0000 Open the tsdb on a existing path --- banyand/tsdb/segment.go | 37 +++++++++++++++++++--------- banyand/tsdb/shard.go | 33 ++++++++++++++++++------- banyand/tsdb/tsdb.go | 61 +++++++++++++++++++++++++++++++++++++++++------ banyand/tsdb/tsdb_test.go | 28 ++++++++++++++++++---- 4 files changed, 127 insertions(+), 32 deletions(-) diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index b2dad50..a2450a4 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -63,20 +63,35 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) { if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil { return nil, err } - blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat)) - if err != nil { - return nil, err + loadBlock := func(path string) error { + var b *block + if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{ + path: path, + }); err != nil { + return err + } + { + s.Lock() + defer s.Unlock() + s.lst = append(s.lst, b) + } + return nil } - var b *block - if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{ - path: blockPath, - }); err != nil { + err = walkDir(path, blockPathPrefix, func(name, absolutePath string) error { + return loadBlock(absolutePath) + }) + if err != nil { return nil, err } - { - s.Lock() - defer s.Unlock() - s.lst = append(s.lst, b) + if len(s.lst) < 1 { + blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat)) + if err != nil { + return nil, err + } + err = loadBlock(blockPath) + if err != nil { + return nil, err + } } return s, nil } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 4bcc40e..451b530 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -49,23 +49,38 @@ func (s *shard) Index() IndexDatabase { return s.indexDatabase } -func newShard(ctx context.Context, id common.ShardID, location string) (*shard, error) { +func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) { s := &shard{ id: id, location: location, } - segPath, err := mkdir(segTemplate, location, time.Now().Format(segFormat)) - if err != nil { - return nil, err + loadSeg := func(path string) error { + seg, err := newSegment(ctx, path) + if err != nil { + return err + } + { + s.Lock() + defer s.Unlock() + s.lst = append(s.lst, seg) + } + return nil } - seg, err := newSegment(ctx, segPath) + err := walkDir(location, segPathPrefix, func(_, absolutePath string) error { + return loadSeg(absolutePath) + }) if err != nil { return nil, err } - { - s.Lock() - defer s.Unlock() - s.lst = append(s.lst, seg) + if len(s.lst) < 1 { + segPath, err := mkdir(segTemplate, location, time.Now().Format(segFormat)) + if err != nil { + return nil, err + } + err = loadSeg(segPath) + if err != nil { + return nil, err + } } seriesPath, err := mkdir(seriesTemplate, s.location) if err != nil { diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index e9ed19a..fbb710a 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -24,6 +24,8 @@ import ( "io/fs" "io/ioutil" "os" + "strconv" + "strings" "sync" "github.com/pkg/errors" @@ -36,11 +38,16 @@ import ( ) const ( - shardTemplate = "%s/shard-%d" - seriesTemplate = "%s/series" - segTemplate = "%s/seg-%s" - blockTemplate = "%s/block-%s" - globalIndexTemplate = "%s/index" + shardPathPrefix = "shard" + pathSeparator = string(os.PathSeparator) + rootPrefix = "%s" + pathSeparator + shardTemplate = rootPrefix + shardPathPrefix + "-%d" + seriesTemplate = rootPrefix + "series" + segPathPrefix = "seg" + segTemplate = rootPrefix + segPathPrefix + "-%s" + blockPathPrefix = "block" + blockTemplate = rootPrefix + blockPathPrefix + "-%s" + globalIndexTemplate = rootPrefix + "index" segFormat = "20060102" blockFormat = "1504" @@ -155,7 +162,7 @@ func createDatabase(ctx context.Context, db *database) (Database, error) { err = multierr.Append(err, errInternal) continue } - so, errNewShard := newShard(ctx, common.ShardID(i), shardLocation) + so, errNewShard := openShard(ctx, common.ShardID(i), shardLocation) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue @@ -166,10 +173,50 @@ func createDatabase(ctx context.Context, db *database) (Database, error) { } func loadDatabase(ctx context.Context, db *database) (Database, error) { - //TODO: load the existing database + //TODO: open the lock file + //TODO: open the manifest file + db.Lock() + defer db.Unlock() + err := walkDir(db.location, shardPathPrefix, func(name, absolutePath string) error { + shardSegs := strings.Split(name, "-") + shardID, err := strconv.Atoi(shardSegs[1]) + if err != nil { + return err + } + if shardID >= int(db.shardNum) { + return nil + } + so, errOpenShard := openShard(ctx, common.ShardID(shardID), absolutePath) + if errOpenShard != nil { + return errOpenShard + } + db.sLst = append(db.sLst, so) + return nil + }) + if err != nil { + return nil, errors.WithMessage(err, "load the database failed") + } return db, nil } +type walkFn func(name, absolutePath string) error + +func walkDir(root, prefix string, walkFn walkFn) error { + files, err := ioutil.ReadDir(root) + if err != nil { + return errors.Wrapf(err, "failed to walk the database path: %s", root) + } + for _, f := range files { + if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) { + continue + } + if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) != nil { + return errors.WithMessagef(err, "failed to load: %s", f.Name()) + } + } + return nil +} + func mkdir(format string, a ...interface{}) (path string, err error) { path = fmt.Sprintf(format, a...) if err = os.MkdirAll(path, dirPerm); err != nil { diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index c570fdf..bfb9957 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -34,8 +34,27 @@ import ( func TestOpenDatabase(t *testing.T) { tester := assert.New(t) - tempDir, deferFunc, _ := setUp(require.New(t)) + req := require.New(t) + tempDir, deferFunc := test.Space(req) + openDatabase(req, tempDir) defer deferFunc() + verifyDatabaseStructure(tester, tempDir) +} + +func TestReOpenDatabase(t *testing.T) { + tester := assert.New(t) + req := require.New(t) + tempDir, deferFunc := test.Space(req) + defer deferFunc() + db := openDatabase(req, tempDir) + req.NoError(db.Close()) + verifyDatabaseStructure(tester, tempDir) + db = openDatabase(req, tempDir) + req.NoError(db.Close()) + verifyDatabaseStructure(tester, tempDir) +} + +func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) { shardPath := fmt.Sprintf(shardTemplate, tempDir, 0) validateDirectory(tester, shardPath) seriesPath := fmt.Sprintf(seriesTemplate, shardPath) @@ -46,16 +65,15 @@ func TestOpenDatabase(t *testing.T) { validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat))) } -func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database) { +func openDatabase(t *require.Assertions, path string) (db Database) { t.NoError(logger.Init(logger.Logging{ Env: "dev", Level: "warn", })) - tempDir, deferFunc = test.Space(t) db, err := OpenDatabase( context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")), DatabaseOpts{ - Location: tempDir, + Location: path, ShardNum: 1, EncodingMethod: EncodingMethod{ EncoderPool: encoding.NewPlainEncoderPool(0), @@ -64,7 +82,7 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database }) t.NoError(err) t.NotNil(db) - return tempDir, deferFunc, db + return db } func validateDirectory(t *assert.Assertions, dir string) {
