This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit df6f00ee73e52bab9aaa3e2c1dcd3898ad4bf9f0 Author: Gao Hongtao <[email protected]> AuthorDate: Mon May 10 20:58:06 2021 +0800 Add trace API to storage module Signed-off-by: Gao Hongtao <[email protected]> --- api/data/trace.go | 2 +- api/fbs/v1/database.fbs | 2 +- api/fbs/v1/trace.fbs | 2 + banyand/index/index.go | 2 +- banyand/internal/cmd/standalone.go | 4 +- banyand/liaison/grpc/grpc.go | 4 +- banyand/liaison/liaison.go | 2 +- banyand/queue/local.go | 10 +++ banyand/queue/queue.go | 7 +- banyand/storage/database.go | 106 ++++++++++++++++++++++- api/fbs/v1/trace.fbs => banyand/storage/kv/kv.go | 11 ++- banyand/storage/storage.go | 5 +- 12 files changed, 140 insertions(+), 17 deletions(-) diff --git a/api/data/trace.go b/api/data/trace.go index a86ffbd..b48ccf2 100644 --- a/api/data/trace.go +++ b/api/data/trace.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package trace +package data import ( "github.com/apache/skywalking-banyandb/api/common" diff --git a/api/fbs/v1/database.fbs b/api/fbs/v1/database.fbs index d9d98cc..a446e32 100644 --- a/api/fbs/v1/database.fbs +++ b/api/fbs/v1/database.fbs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -namespace database; +namespace v1; table Node { id:string; diff --git a/api/fbs/v1/trace.fbs b/api/fbs/v1/trace.fbs index eaef6a0..8f31592 100644 --- a/api/fbs/v1/trace.fbs +++ b/api/fbs/v1/trace.fbs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +namespace v1; + table Trace { } \ No newline at end of file diff --git a/banyand/index/index.go b/banyand/index/index.go index 717f71e..eb4706a 100644 --- a/banyand/index/index.go +++ b/banyand/index/index.go @@ -30,6 +30,6 @@ type Builder interface { run.PreRunner } -func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Pipeline) (Builder, error) { +func NewBuilder(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Builder, error) { return nil, nil } diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 5f3ac30..b9f6e90 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -49,11 +49,11 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal("failed to initiate service repository", logger.Error(err)) } - pipeline, err := queue.NewPipeline(ctx, repo) + pipeline, err := queue.NewQueue(ctx, repo) if err != nil { l.Fatal("failed to initiate data pipeline", logger.Error(err)) } - db, err := storage.NewDB(ctx, repo) + db, err := storage.NewDB(ctx, repo, pipeline) if err != nil { l.Fatal("failed to initiate database", logger.Error(err)) } diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go index f19dce3..cdf7e71 100644 --- a/banyand/liaison/grpc/grpc.go +++ b/banyand/liaison/grpc/grpc.go @@ -34,10 +34,10 @@ type Server struct { addr string log *logger.Logger ser *grpclib.Server - pipeline queue.Pipeline + pipeline queue.Queue } -func NewServer(ctx context.Context, pipeline queue.Pipeline) *Server { +func NewServer(ctx context.Context, pipeline queue.Queue) *Server { return &Server{pipeline: pipeline} } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index e13a83f..117b361 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -30,6 +30,6 @@ type Endpoint interface { run.Service } -func NewEndpoint(ctx context.Context, pipeline queue.Pipeline) (Endpoint, error) { +func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) { return grpc.NewServer(ctx, pipeline), nil } diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 0e99b90..aeabda5 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -44,6 +44,8 @@ type DataPublisher interface { var _ run.PreRunner = (*Local)(nil) var _ run.Config = (*Local)(nil) +var _ bus.Publisher = (*Local)(nil) +var _ bus.Subscriber = (*Local)(nil) type Local struct { logger *logger.Logger @@ -54,6 +56,14 @@ type Local struct { repo discovery.ServiceRepo } +func (e *Local) Subscribe(topic bus.Topic, listener bus.MessageListener) error { + panic("implement me") +} + +func (e *Local) Publish(topic bus.Topic, message ...bus.Message) error { + panic("implement me") +} + func (e *Local) FlagSet() *run.FlagSet { e.logger = logger.GetLogger(name) fs := run.NewFlagSet("storage") diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 43b0ebc..ff59cee 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -21,14 +21,17 @@ import ( "context" "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/internal/bus" "github.com/apache/skywalking-banyandb/pkg/run" ) -type Pipeline interface { +type Queue interface { run.Config run.PreRunner + bus.Subscriber + bus.Publisher } -func NewPipeline(ctx context.Context, repo discovery.ServiceRepo) (Pipeline, error) { +func NewQueue(ctx context.Context, repo discovery.ServiceRepo) (Queue, error) { return &Local{repo: repo}, nil } diff --git a/banyand/storage/database.go b/banyand/storage/database.go index 7cba5da..b8fa32b 100644 --- a/banyand/storage/database.go +++ b/banyand/storage/database.go @@ -18,16 +18,29 @@ package storage import ( + "fmt" + "io/ioutil" + "os" + "sync" + + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/data" "github.com/apache/skywalking-banyandb/api/event" "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/internal/bus" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/storage/kv" "github.com/apache/skywalking-banyandb/pkg/run" ) var _ Database = (*DB)(nil) type DB struct { - repo discovery.ServiceRepo + root string + shards int + repo discovery.ServiceRepo + q queue.Queue } func (d *DB) Name() string { @@ -35,7 +48,10 @@ func (d *DB) Name() string { } func (d *DB) FlagSet() *run.FlagSet { - return nil + fs := run.NewFlagSet("storage") + fs.StringVar(&d.root, "root-path", "", "the root path of database") + fs.IntVar(&d.shards, "shards", 1, "total shards size") + return fs } func (d *DB) Validate() error { @@ -43,5 +59,91 @@ func (d *DB) Validate() error { } func (d *DB) PreRun() error { + if err := d.init(); err != nil { + return fmt.Errorf("failed to initialize db: %v", err) + } + if err := d.start(); err != nil { + return fmt.Errorf("failed to start db: %v", err) + } return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), bus.NewMessage(1, event.NewShard())) } + +type segment struct { + lst []kv.Block + sync.Mutex +} + +func (s *segment) AddBlock(b kv.Block) { + s.Lock() + defer s.Unlock() + s.lst = append(s.lst, b) +} + +type shard struct { + id int + lst []*segment + sync.Mutex +} + +func (s *shard) newSeg() *segment { + s.Lock() + defer s.Unlock() + seg := &segment{} + s.lst = append(s.lst, seg) + return seg +} + +func (s *shard) init() error { + seg := s.newSeg() + b, err := kv.NewBlock() + if err != nil { + return fmt.Errorf("failed to create segment: %v", err) + } + seg.AddBlock(b) + return nil +} + +func (d *DB) init() (err error) { + if err = os.MkdirAll(d.root, os.ModeDir); err != nil { + return fmt.Errorf("failed to create %s: %v", d.root, err) + } + var isEmpty bool + if isEmpty, err = isEmptyDir(d.root); err != nil { + return fmt.Errorf("checking directory contents failed: %v", err) + } + if !isEmpty { + return nil + } + for i := 0; i < d.shards; i++ { + s := newShard(i) + err = multierr.Append(err, s.init()) + } + if err != nil { + return fmt.Errorf("failed to init shards: %v", err) + } + return nil +} + +func (d *DB) start() error { + return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d) +} + +func (d *DB) Rev(message bus.Message) { + _, ok := message.Data().(data.Trace) + if !ok { + return + } + //TODO: save data into target shard +} + +func newShard(id int) *shard { + return &shard{id: id} +} + +func isEmptyDir(name string) (bool, error) { + entries, err := ioutil.ReadDir(name) + if err != nil { + return false, err + } + return len(entries) == 0, nil +} diff --git a/api/fbs/v1/trace.fbs b/banyand/storage/kv/kv.go similarity index 90% copy from api/fbs/v1/trace.fbs copy to banyand/storage/kv/kv.go index eaef6a0..c145286 100644 --- a/api/fbs/v1/trace.fbs +++ b/banyand/storage/kv/kv.go @@ -15,6 +15,11 @@ // specific language governing permissions and limitations // under the License. -table Trace { - -} \ No newline at end of file +package kv + +type Block interface { +} + +func NewBlock() (Block, error) { + return nil, nil +} diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go index c78cde5..feb408c 100644 --- a/banyand/storage/storage.go +++ b/banyand/storage/storage.go @@ -21,6 +21,7 @@ import ( "context" "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -29,6 +30,6 @@ type Database interface { run.PreRunner } -func NewDB(ctx context.Context, repo discovery.ServiceRepo) (Database, error) { - return &DB{repo: repo}, nil +func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Database, error) { + return &DB{repo: repo, q: pipeline}, nil }
