This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c2eeca  Add trace API to storage module
0c2eeca is described below

commit 0c2eecac1bbb92d1a9ebb07ca1b701723d8fa541
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon May 10 20:58:06 2021 +0800

    Add trace API to storage module
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/data/trace.go                                |   2 +-
 api/fbs/v1/database.fbs                          |   2 +-
 api/fbs/v1/trace.fbs                             |   2 +
 banyand/index/index.go                           |   2 +-
 banyand/internal/cmd/standalone.go               |   4 +-
 banyand/liaison/grpc/grpc.go                     |   4 +-
 banyand/liaison/liaison.go                       |   2 +-
 banyand/queue/local.go                           |  10 +++
 banyand/queue/queue.go                           |   7 +-
 banyand/storage/database.go                      | 109 ++++++++++++++++++++++-
 api/fbs/v1/trace.fbs => banyand/storage/kv/kv.go |  10 ++-
 banyand/storage/storage.go                       |   5 +-
 12 files changed, 142 insertions(+), 17 deletions(-)

diff --git a/api/data/trace.go b/api/data/trace.go
index a86ffbd..b48ccf2 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package trace
+package data
 
 import (
        "github.com/apache/skywalking-banyandb/api/common"
diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs
index d9d98cc..a446e32 100644
--- a/api/fbs/v1/database.fbs
+++ b/api/fbs/v1/database.fbs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-namespace database;
+namespace v1;
 
 table Node {
     id:string;
diff --git a/api/fbs/v1/trace.fbs b/api/fbs/v1/trace.fbs
index eaef6a0..8f31592 100644
--- a/api/fbs/v1/trace.fbs
+++ b/api/fbs/v1/trace.fbs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+namespace v1;
+
 table Trace {
     
 }
\ No newline at end of file
diff --git a/banyand/index/index.go b/banyand/index/index.go
index 717f71e..eb4706a 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -30,6 +30,6 @@ type Builder interface {
        run.PreRunner
 }
 
-func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline 
queue.Pipeline) (Builder, error) {
+func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline 
queue.Queue) (Builder, error) {
        return nil, nil
 }
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 5f3ac30..b9f6e90 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -49,11 +49,11 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal("failed to initiate service repository", 
logger.Error(err))
        }
-       pipeline, err := queue.NewPipeline(ctx, repo)
+       pipeline, err := queue.NewQueue(ctx, repo)
        if err != nil {
                l.Fatal("failed to initiate data pipeline", logger.Error(err))
        }
-       db, err := storage.NewDB(ctx, repo)
+       db, err := storage.NewDB(ctx, repo, pipeline)
        if err != nil {
                l.Fatal("failed to initiate database", logger.Error(err))
        }
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index f19dce3..cdf7e71 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -34,10 +34,10 @@ type Server struct {
        addr     string
        log      *logger.Logger
        ser      *grpclib.Server
-       pipeline queue.Pipeline
+       pipeline queue.Queue
 }
 
-func NewServer(ctx context.Context, pipeline queue.Pipeline) *Server {
+func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
        return &Server{pipeline: pipeline}
 }
 
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index e13a83f..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -30,6 +30,6 @@ type Endpoint interface {
        run.Service
 }
 
-func NewEndpoint(ctx context.Context, pipeline queue.Pipeline) (Endpoint, 
error) {
+func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
        return grpc.NewServer(ctx, pipeline), nil
 }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 0e99b90..aeabda5 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -44,6 +44,8 @@ type DataPublisher interface {
 
 var _ run.PreRunner = (*Local)(nil)
 var _ run.Config = (*Local)(nil)
+var _ bus.Publisher = (*Local)(nil)
+var _ bus.Subscriber = (*Local)(nil)
 
 type Local struct {
        logger  *logger.Logger
@@ -54,6 +56,14 @@ type Local struct {
        repo    discovery.ServiceRepo
 }
 
+func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error 
{
+       panic("implement me")
+}
+
+func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error {
+       panic("implement me")
+}
+
 func (e *Local) FlagSet() *run.FlagSet {
        e.logger = logger.GetLogger(name)
        fs := run.NewFlagSet("storage")
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 43b0ebc..ff59cee 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -21,14 +21,17 @@ import (
        "context"
 
        "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/internal/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Pipeline interface {
+type Queue interface {
        run.Config
        run.PreRunner
+       bus.Subscriber
+       bus.Publisher
 }
 
-func NewPipeline(ctx context.Context, repo discovery.ServiceRepo) (Pipeline, 
error) {
+func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) {
        return &Local{repo: repo}, nil
 }
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index 7cba5da..126fa42 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -18,16 +18,29 @@
 package storage
 
 import (
+       "fmt"
+       "io/ioutil"
+       "os"
+       "sync"
+
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/api/data"
        "github.com/apache/skywalking-banyandb/api/event"
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/storage/kv"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var _ Database = (*DB)(nil)
 
 type DB struct {
-       repo discovery.ServiceRepo
+       root   string
+       shards int
+       repo   discovery.ServiceRepo
+       q      queue.Queue
 }
 
 func (d *DB) Name() string {
@@ -35,7 +48,10 @@ func (d *DB) Name() string {
 }
 
 func (d *DB) FlagSet() *run.FlagSet {
-       return nil
+       fs := run.NewFlagSet("storage")
+       fs.StringVar(&d.root, "root-path", "", "the root path of database")
+       fs.IntVar(&d.shards, "shards", 1, "total shards size")
+       return fs
 }
 
 func (d *DB) Validate() error {
@@ -43,5 +59,94 @@ func (d *DB) Validate() error {
 }
 
 func (d *DB) PreRun() error {
+       if err := d.init(); err != nil {
+               return fmt.Errorf("failed to initialize db: %v", err)
+       }
+       if err := d.start(); err != nil {
+               return fmt.Errorf("failed to start db: %v", err)
+       }
        return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), 
bus.NewMessage(1, event.NewShard()))
 }
+
+type Block interface {
+}
+
+type segment struct {
+       lst []Block
+       sync.Mutex
+}
+
+func (s *segment) AddBlock(b Block) {
+       s.Lock()
+       defer s.Unlock()
+       s.lst = append(s.lst, b)
+}
+
+type shard struct {
+       id  int
+       lst []*segment
+       sync.Mutex
+}
+
+func (s *shard) newSeg() *segment {
+       s.Lock()
+       defer s.Unlock()
+       seg := &segment{}
+       s.lst = append(s.lst, seg)
+       return seg
+}
+
+func (s *shard) init() error {
+       seg := s.newSeg()
+       b, err := kv.Block()
+       if err != nil {
+               return fmt.Errorf("failed to create segment: %v", err)
+       }
+       seg.AddBlock(b)
+       return nil
+}
+
+func (d *DB) init() (err error) {
+       if err = os.MkdirAll(d.root, os.ModeDir); err != nil {
+               return fmt.Errorf("failed to create %s: %v", d.root, err)
+       }
+       var isEmpty bool
+       if isEmpty, err = isEmptyDir(d.root); err != nil {
+               return fmt.Errorf("checking directory contents failed: %v", err)
+       }
+       if !isEmpty {
+               return nil
+       }
+       for i := 0; i < d.shards; i++ {
+               s := newShard(i)
+               err = multierr.Append(err, s.init())
+       }
+       if err != nil {
+               return fmt.Errorf("failed to init shards: %v", err)
+       }
+       return nil
+}
+
+func (d *DB) start() error {
+       return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d)
+}
+
+func (d *DB) Rev(message bus.Message) {
+       _, ok := message.Data().(data.Trace)
+       if !ok {
+               return
+       }
+       //TODO: save data into target shard
+}
+
+func newShard(id int) *shard {
+       return &shard{id: id}
+}
+
+func isEmptyDir(name string) (bool, error) {
+       entries, err := ioutil.ReadDir(name)
+       if err != nil {
+               return false, err
+       }
+       return len(entries) == 0, nil
+}
diff --git a/api/fbs/v1/trace.fbs b/banyand/storage/kv/kv.go
similarity index 86%
copy from api/fbs/v1/trace.fbs
copy to banyand/storage/kv/kv.go
index eaef6a0..af8447e 100644
--- a/api/fbs/v1/trace.fbs
+++ b/banyand/storage/kv/kv.go
@@ -15,6 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-table Trace {
-    
-}
\ No newline at end of file
+package kv
+
+import "github.com/apache/skywalking-banyandb/banyand/storage"
+
+func Block() (storage.Block, error) {
+       return nil, nil
+}
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index c78cde5..feb408c 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -21,6 +21,7 @@ import (
        "context"
 
        "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -29,6 +30,6 @@ type Database interface {
        run.PreRunner
 }
 
-func NewDB(ctx context.Context, repo discovery.ServiceRepo) (Database, error) {
-       return &DB{repo: repo}, nil
+func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline 
queue.Queue) (Database, error) {
+       return &DB{repo: repo, q: pipeline}, nil
 }

Reply via email to