This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-load
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 24eb7f01417ace87ad6f5d2fefcc25594024fc31
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jan 6 03:43:07 2022 +0000

    Open the tsdb on a existing path
---
 banyand/tsdb/segment.go   | 37 +++++++++++++++++++---------
 banyand/tsdb/shard.go     | 33 ++++++++++++++++++-------
 banyand/tsdb/tsdb.go      | 61 +++++++++++++++++++++++++++++++++++++++++------
 banyand/tsdb/tsdb_test.go | 28 ++++++++++++++++++----
 4 files changed, 127 insertions(+), 32 deletions(-)

diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b2dad50..a2450a4 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -63,20 +63,35 @@ func newSegment(ctx context.Context, path string) (s 
*segment, err error) {
        if s.globalIndex, err = kv.OpenStore(0, indexPath, 
kv.StoreWithLogger(s.l)); err != nil {
                return nil, err
        }
-       blockPath, err := mkdir(blockTemplate, path, 
time.Now().Format(blockFormat))
-       if err != nil {
-               return nil, err
+       loadBlock := func(path string) error {
+               var b *block
+               if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, 
s.l), blockOpts{
+                       path: path,
+               }); err != nil {
+                       return err
+               }
+               {
+                       s.Lock()
+                       defer s.Unlock()
+                       s.lst = append(s.lst, b)
+               }
+               return nil
        }
-       var b *block
-       if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), 
blockOpts{
-               path: blockPath,
-       }); err != nil {
+       err = walkDir(path, blockPathPrefix, func(name, absolutePath string) 
error {
+               return loadBlock(absolutePath)
+       })
+       if err != nil {
                return nil, err
        }
-       {
-               s.Lock()
-               defer s.Unlock()
-               s.lst = append(s.lst, b)
+       if len(s.lst) < 1 {
+               blockPath, err := mkdir(blockTemplate, path, 
time.Now().Format(blockFormat))
+               if err != nil {
+                       return nil, err
+               }
+               err = loadBlock(blockPath)
+               if err != nil {
+                       return nil, err
+               }
        }
        return s, nil
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4bcc40e..451b530 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -49,23 +49,38 @@ func (s *shard) Index() IndexDatabase {
        return s.indexDatabase
 }
 
-func newShard(ctx context.Context, id common.ShardID, location string) 
(*shard, error) {
+func openShard(ctx context.Context, id common.ShardID, location string) 
(*shard, error) {
        s := &shard{
                id:       id,
                location: location,
        }
-       segPath, err := mkdir(segTemplate, location, 
time.Now().Format(segFormat))
-       if err != nil {
-               return nil, err
+       loadSeg := func(path string) error {
+               seg, err := newSegment(ctx, path)
+               if err != nil {
+                       return err
+               }
+               {
+                       s.Lock()
+                       defer s.Unlock()
+                       s.lst = append(s.lst, seg)
+               }
+               return nil
        }
-       seg, err := newSegment(ctx, segPath)
+       err := walkDir(location, segPathPrefix, func(_, absolutePath string) 
error {
+               return loadSeg(absolutePath)
+       })
        if err != nil {
                return nil, err
        }
-       {
-               s.Lock()
-               defer s.Unlock()
-               s.lst = append(s.lst, seg)
+       if len(s.lst) < 1 {
+               segPath, err := mkdir(segTemplate, location, 
time.Now().Format(segFormat))
+               if err != nil {
+                       return nil, err
+               }
+               err = loadSeg(segPath)
+               if err != nil {
+                       return nil, err
+               }
        }
        seriesPath, err := mkdir(seriesTemplate, s.location)
        if err != nil {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index e9ed19a..fbb710a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -24,6 +24,8 @@ import (
        "io/fs"
        "io/ioutil"
        "os"
+       "strconv"
+       "strings"
        "sync"
 
        "github.com/pkg/errors"
@@ -36,11 +38,16 @@ import (
 )
 
 const (
-       shardTemplate       = "%s/shard-%d"
-       seriesTemplate      = "%s/series"
-       segTemplate         = "%s/seg-%s"
-       blockTemplate       = "%s/block-%s"
-       globalIndexTemplate = "%s/index"
+       shardPathPrefix     = "shard"
+       pathSeparator       = string(os.PathSeparator)
+       rootPrefix          = "%s" + pathSeparator
+       shardTemplate       = rootPrefix + shardPathPrefix + "-%d"
+       seriesTemplate      = rootPrefix + "series"
+       segPathPrefix       = "seg"
+       segTemplate         = rootPrefix + segPathPrefix + "-%s"
+       blockPathPrefix     = "block"
+       blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
+       globalIndexTemplate = rootPrefix + "index"
 
        segFormat   = "20060102"
        blockFormat = "1504"
@@ -155,7 +162,7 @@ func createDatabase(ctx context.Context, db *database) 
(Database, error) {
                        err = multierr.Append(err, errInternal)
                        continue
                }
-               so, errNewShard := newShard(ctx, common.ShardID(i), 
shardLocation)
+               so, errNewShard := openShard(ctx, common.ShardID(i), 
shardLocation)
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -166,10 +173,50 @@ func createDatabase(ctx context.Context, db *database) 
(Database, error) {
 }
 
 func loadDatabase(ctx context.Context, db *database) (Database, error) {
-       //TODO: load the existing database
+       //TODO: open the lock file
+       //TODO: open the manifest file
+       db.Lock()
+       defer db.Unlock()
+       err := walkDir(db.location, shardPathPrefix, func(name, absolutePath 
string) error {
+               shardSegs := strings.Split(name, "-")
+               shardID, err := strconv.Atoi(shardSegs[1])
+               if err != nil {
+                       return err
+               }
+               if shardID >= int(db.shardNum) {
+                       return nil
+               }
+               so, errOpenShard := openShard(ctx, common.ShardID(shardID), 
absolutePath)
+               if errOpenShard != nil {
+                       return errOpenShard
+               }
+               db.sLst = append(db.sLst, so)
+               return nil
+       })
+       if err != nil {
+               return nil, errors.WithMessage(err, "load the database failed")
+       }
        return db, nil
 }
 
+type walkFn func(name, absolutePath string) error
+
+func walkDir(root, prefix string, walkFn walkFn) error {
+       files, err := ioutil.ReadDir(root)
+       if err != nil {
+               return errors.Wrapf(err, "failed to walk the database path: 
%s", root)
+       }
+       for _, f := range files {
+               if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
+                       continue
+               }
+               if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) != 
nil {
+                       return errors.WithMessagef(err, "failed to load: %s", 
f.Name())
+               }
+       }
+       return nil
+}
+
 func mkdir(format string, a ...interface{}) (path string, err error) {
        path = fmt.Sprintf(format, a...)
        if err = os.MkdirAll(path, dirPerm); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index c570fdf..bfb9957 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -34,8 +34,27 @@ import (
 
 func TestOpenDatabase(t *testing.T) {
        tester := assert.New(t)
-       tempDir, deferFunc, _ := setUp(require.New(t))
+       req := require.New(t)
+       tempDir, deferFunc := test.Space(req)
+       openDatabase(req, tempDir)
        defer deferFunc()
+       verifyDatabaseStructure(tester, tempDir)
+}
+
+func TestReOpenDatabase(t *testing.T) {
+       tester := assert.New(t)
+       req := require.New(t)
+       tempDir, deferFunc := test.Space(req)
+       defer deferFunc()
+       db := openDatabase(req, tempDir)
+       req.NoError(db.Close())
+       verifyDatabaseStructure(tester, tempDir)
+       db = openDatabase(req, tempDir)
+       req.NoError(db.Close())
+       verifyDatabaseStructure(tester, tempDir)
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
        shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
        validateDirectory(tester, shardPath)
        seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
@@ -46,16 +65,15 @@ func TestOpenDatabase(t *testing.T) {
        validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockFormat)))
 }
 
-func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db 
Database) {
+func openDatabase(t *require.Assertions, path string) (db Database) {
        t.NoError(logger.Init(logger.Logging{
                Env:   "dev",
                Level: "warn",
        }))
-       tempDir, deferFunc = test.Space(t)
        db, err := OpenDatabase(
                context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test")),
                DatabaseOpts{
-                       Location: tempDir,
+                       Location: path,
                        ShardNum: 1,
                        EncodingMethod: EncodingMethod{
                                EncoderPool: encoding.NewPlainEncoderPool(0),
@@ -64,7 +82,7 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc 
func(), db Database
                })
        t.NoError(err)
        t.NotNil(db)
-       return tempDir, deferFunc, db
+       return db
 }
 
 func validateDirectory(t *assert.Assertions, dir string) {

Reply via email to