This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b6a6af  Introduce run module to control the lifecycle of components
8b6a6af is described below

commit 8b6a6afa108ec425211c8ef4b4f39bc81e634a6b
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Apr 14 19:39:57 2021 +0800

    Introduce run module to control the lifecycle of components
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/executor/executor.go       |  46 +++--
 banyand/index/index.go             |  32 ++-
 banyand/internal/bus/bus.go        |  23 +--
 banyand/internal/cmd/standalone.go |  76 +++----
 banyand/series/series.go           |  32 ++-
 banyand/shard/shard.go             |  43 ++--
 banyand/storage/pipeline.go        |  61 ++++++
 go.mod                             |   3 +-
 go.sum                             |   2 +
 pkg/logger/logger.go               |  26 +--
 pkg/logger/setting.go              |   4 +-
 pkg/logger/setting_test.go         |   6 +-
 pkg/run/run.go                     | 391 +++++++++++++++++++++++++++++++++++++
 pkg/signal/handler.go              |  70 +++++++
 pkg/version/version.go             |  38 ++++
 15 files changed, 725 insertions(+), 128 deletions(-)

diff --git a/banyand/executor/executor.go b/banyand/executor/executor.go
index ac25e9e..bedc415 100644
--- a/banyand/executor/executor.go
+++ b/banyand/executor/executor.go
@@ -23,29 +23,47 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/bus"
        "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Executor)(nil)
+const name = "executor"
+
+var (
+       _ bus.MessageListener    = (*Executor)(nil)
+       _ run.PreRunner          = (*Executor)(nil)
+       _ storage.DataSubscriber = (*Executor)(nil)
+       _ storage.DataPublisher  = (*Executor)(nil)
+)
 
 type Executor struct {
-       log *logger.Logger
-       bus *bus.Bus
+       log       *logger.Logger
+       publisher bus.Publisher
 }
 
-func NewExecutor(bus *bus.Bus) *Executor {
-       return &Executor{
-               bus: bus,
-               log: logger.GetLogger("executor"),
-       }
+func (s *Executor) Pub(publisher bus.Publisher) error {
+       s.publisher = publisher
+       return nil
 }
 
-func (s Executor) Rev(message bus.Message) {
-       s.log.Info("rev", logger.Any("msg", message.Data()))
-       _ = s.bus.Publish(storage.TraceIndex, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
-       _ = s.bus.Publish(storage.TraceData, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+func (s *Executor) ComponentName() string {
+       return name
+}
+
+func (s *Executor) Sub(subscriber bus.Subscriber) error {
+       return subscriber.Subscribe(storage.TraceRaw, s)
 }
 
-func (s Executor) Close() error {
-       s.log.Info("closed")
+func (s *Executor) Name() string {
+       return name
+}
+
+func (s *Executor) PreRun() error {
+       s.log = logger.GetLogger(name)
        return nil
 }
+
+func (s Executor) Rev(message bus.Message) {
+       s.log.Info("rev", logger.Any("msg", message.Data()))
+       _ = s.publisher.Publish(storage.TraceIndex, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "index message"))
+       _ = s.publisher.Publish(storage.TraceData, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "data message"))
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index d29eb89..496e145 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -19,26 +19,40 @@ package index
 
 import (
        "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+       "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Index)(nil)
+const name = "index"
+
+var (
+       _ bus.MessageListener    = (*Index)(nil)
+       _ run.PreRunner          = (*Index)(nil)
+       _ storage.DataSubscriber = (*Index)(nil)
+)
 
 type Index struct {
        log *logger.Logger
 }
 
-func NewIndex() *Index {
-       return &Index{
-               log: logger.GetLogger("Index"),
-       }
+func (s *Index) ComponentName() string {
+       return name
 }
 
-func (s Index) Rev(message bus.Message) {
-       s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Index) Sub(subscriber bus.Subscriber) error {
+       return subscriber.Subscribe(storage.TraceIndex, s)
+}
+
+func (s Index) Name() string {
+       return name
 }
 
-func (s Index) Close() error {
-       s.log.Info("closed")
+func (s Index) PreRun() error {
+       s.log = logger.GetLogger(name)
        return nil
 }
+
+func (s Index) Rev(message bus.Message) {
+       s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/internal/bus/bus.go b/banyand/internal/bus/bus.go
index e2699fa..05f2065 100644
--- a/banyand/internal/bus/bus.go
+++ b/banyand/internal/bus/bus.go
@@ -19,10 +19,7 @@ package bus
 
 import (
        "errors"
-       "io"
        "sync"
-
-       "go.uber.org/atomic"
 )
 
 // Payload represents a simple data
@@ -43,10 +40,17 @@ func NewMessage(id MessageID, data interface{}) Message {
        return Message{id: id, payload: data}
 }
 
-// EventListener is the signature of functions that can handle an EventMessage.
+//MessageListener is the signature of functions that can handle an 
EventMessage.
 type MessageListener interface {
        Rev(message Message)
-       io.Closer
+}
+
+type Subscriber interface {
+       Subscribe(topic Topic, listener MessageListener) error
+}
+
+type Publisher interface {
+       Publish(topic Topic, message ...Message) error
 }
 
 type Channel chan Message
@@ -56,7 +60,6 @@ type Topic string
 // The Bus allows publish-subscribe-style communication between components
 type Bus struct {
        topics map[Topic][]Channel
-       closed atomic.Bool
        mutex  sync.RWMutex
 }
 
@@ -70,16 +73,12 @@ var (
        ErrTopicEmpty    = errors.New("the topic is empty")
        ErrTopicNotExist = errors.New("the topic does not exist")
        ErrListenerEmpty = errors.New("the message listener is empty")
-       ErrClosed        = errors.New("the bus is closed")
 )
 
 func (b *Bus) Publish(topic Topic, message ...Message) error {
        if topic == "" {
                return ErrTopicEmpty
        }
-       if b.closed.Load() {
-               return ErrClosed
-       }
        cc, exit := b.topics[topic]
        if !exit {
                return ErrTopicNotExist
@@ -104,9 +103,6 @@ func (b *Bus) Subscribe(topic Topic, listener 
MessageListener) error {
        if listener == nil {
                return ErrListenerEmpty
        }
-       if b.closed.Load() {
-               return ErrClosed
-       }
        b.mutex.Lock()
        defer b.mutex.Unlock()
        if _, exist := b.topics[topic]; !exist {
@@ -122,7 +118,6 @@ func (b *Bus) Subscribe(topic Topic, listener 
MessageListener) error {
                        if ok {
                                listener.Rev(c)
                        } else {
-                               _ = listener.Close()
                                break
                        }
                }
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 27ca159..8e0ea44 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -18,26 +18,26 @@
 package cmd
 
 import (
-       "context"
        "os"
-       "os/signal"
-       "syscall"
 
        "github.com/spf13/cobra"
-       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/banyand/config"
-       "github.com/apache/skywalking-banyandb/banyand/executor"
-       "github.com/apache/skywalking-banyandb/banyand/index"
-       "github.com/apache/skywalking-banyandb/banyand/internal/bus"
-       "github.com/apache/skywalking-banyandb/banyand/series"
-       "github.com/apache/skywalking-banyandb/banyand/shard"
+       executor2 "github.com/apache/skywalking-banyandb/banyand/executor"
+       index2 "github.com/apache/skywalking-banyandb/banyand/index"
+       series2 "github.com/apache/skywalking-banyandb/banyand/series"
+       shard2 "github.com/apache/skywalking-banyandb/banyand/shard"
        "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/signal"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var standAloneConfig config.Standalone
+var (
+       standAloneConfig config.Standalone
+       g                = run.Group{Name: "standalone"}
+)
 
 func newStandaloneCmd() *cobra.Command {
        standaloneCmd := &cobra.Command{
@@ -55,38 +55,38 @@ func newStandaloneCmd() *cobra.Command {
                },
                RunE: func(cmd *cobra.Command, args []string) (err error) {
                        logger.GetLogger().Info("starting as a standalone 
server")
-                       dataBus := bus.NewBus()
-                       err = multierr.Append(err, 
dataBus.Subscribe(storage.TraceRaw, shard.NewShard(dataBus)))
-                       err = multierr.Append(err, 
dataBus.Subscribe(storage.TraceSharded, executor.NewExecutor(dataBus)))
-                       err = multierr.Append(err, 
dataBus.Subscribe(storage.TraceIndex, index.NewIndex()))
-                       err = multierr.Append(err, 
dataBus.Subscribe(storage.TraceData, series.NewSeries()))
-                       if err != nil {
-                               return err
-                       }
-                       if err = dataBus.Publish(storage.TraceRaw, 
bus.NewMessage(0, "initialization")); err != nil {
-                               return err
+                       engine := new(storage.Pipeline)
+                       shard := new(shard2.Shard)
+                       executor := new(executor2.Executor)
+                       index := new(index2.Index)
+                       series := new(series2.Series)
+
+                       // Register the storage engine components.
+                       engine.Register(
+                               shard,
+                               executor,
+                               index,
+                               series,
+                       )
+
+                       // Register the run Group units.
+                       g.Register(
+                               new(signal.Handler),
+                               engine,
+                               shard,
+                               executor,
+                               index,
+                               series,
+                       )
+
+                       // Spawn our go routines and wait for shutdown.
+                       if err := g.Run(args...); err != nil {
+                               logger.GetLogger().Error("exit: ", 
logger.String("name", g.Name), logger.Error(err))
+                               os.Exit(-1)
                        }
-                       ctx := newContext()
-                       <-ctx.Done()
                        return nil
                },
        }
 
        return standaloneCmd
 }
-
-func newContext() context.Context {
-       c := make(chan os.Signal, 1)
-       signal.Notify(c, os.Interrupt, syscall.SIGTERM)
-       ctx, cancel := context.WithCancel(context.Background())
-       go func() {
-               defer cancel()
-               select {
-               case <-ctx.Done():
-                       return
-               case <-c:
-                       return
-               }
-       }()
-       return ctx
-}
diff --git a/banyand/series/series.go b/banyand/series/series.go
index a51a2a1..850e9b1 100644
--- a/banyand/series/series.go
+++ b/banyand/series/series.go
@@ -19,26 +19,40 @@ package series
 
 import (
        "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+       "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Series)(nil)
+const name = "series"
+
+var (
+       _ bus.MessageListener    = (*Series)(nil)
+       _ run.PreRunner          = (*Series)(nil)
+       _ storage.DataSubscriber = (*Series)(nil)
+)
 
 type Series struct {
        log *logger.Logger
 }
 
-func NewSeries() *Series {
-       return &Series{
-               log: logger.GetLogger("series"),
-       }
+func (s Series) ComponentName() string {
+       return name
 }
 
-func (s Series) Rev(message bus.Message) {
-       s.log.Info("rev", logger.Any("msg", message.Data()))
+func (s *Series) Sub(subscriber bus.Subscriber) error {
+       return subscriber.Subscribe(storage.TraceData, s)
+}
+
+func (s Series) Name() string {
+       return name
 }
 
-func (s Series) Close() error {
-       s.log.Info("closed")
+func (s *Series) PreRun() error {
+       s.log = logger.GetLogger(name)
        return nil
 }
+
+func (s Series) Rev(message bus.Message) {
+       s.log.Info("rev", logger.Any("msg", message.Data()))
+}
diff --git a/banyand/shard/shard.go b/banyand/shard/shard.go
index a25baff..24c191c 100644
--- a/banyand/shard/shard.go
+++ b/banyand/shard/shard.go
@@ -23,28 +23,45 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/bus"
        "github.com/apache/skywalking-banyandb/banyand/storage"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ bus.MessageListener = (*Shard)(nil)
+var (
+       _ bus.MessageListener    = (*Shard)(nil)
+       _ run.PreRunner          = (*Shard)(nil)
+       _ storage.DataSubscriber = (*Shard)(nil)
+       _ storage.DataPublisher  = (*Shard)(nil)
+)
 
 type Shard struct {
-       log *logger.Logger
-       bus *bus.Bus
+       log       *logger.Logger
+       publisher bus.Publisher
 }
 
-func NewShard(bus *bus.Bus) *Shard {
-       return &Shard{
-               bus: bus,
-               log: logger.GetLogger("shard"),
-       }
+func (s Shard) ComponentName() string {
+       return "shard"
 }
 
-func (s Shard) Rev(message bus.Message) {
-       s.log.Info("rev", logger.Any("msg", message.Data()))
-       _ = s.bus.Publish(storage.TraceSharded, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+func (s *Shard) Pub(publisher bus.Publisher) error {
+       s.publisher = publisher
+       return nil
+}
+
+func (s *Shard) Sub(subscriber bus.Subscriber) error {
+       return subscriber.Subscribe(storage.TraceRaw, s)
 }
 
-func (s Shard) Close() error {
-       s.log.Info("closed")
+func (s *Shard) PreRun() error {
+       s.log = logger.GetLogger("shard")
+       s.log.Info("pre running")
        return nil
 }
+
+func (s *Shard) Name() string {
+       return "shard"
+}
+
+func (s Shard) Rev(message bus.Message) {
+       s.log.Info("rev", logger.Any("msg", message.Data()))
+       _ = s.publisher.Publish(storage.TraceSharded, 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), "sharded message"))
+}
diff --git a/banyand/storage/pipeline.go b/banyand/storage/pipeline.go
index eb30eab..6f51c84 100644
--- a/banyand/storage/pipeline.go
+++ b/banyand/storage/pipeline.go
@@ -17,9 +17,70 @@
 
 package storage
 
+import (
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
 const (
        TraceRaw     = "trace-raw"
        TraceSharded = "trace-sharded"
        TraceIndex   = "trace-index"
        TraceData    = "trace-data"
 )
+
+const name = "storage-engine"
+
+type Component interface {
+       ComponentName() string
+}
+
+type DataSubscriber interface {
+       Component
+       Sub(subscriber bus.Subscriber) error
+}
+
+type DataPublisher interface {
+       Component
+       Pub(publisher bus.Publisher) error
+}
+
+var _ run.PreRunner = (*Pipeline)(nil)
+
+type Pipeline struct {
+       logger  *logger.Logger
+       dataBus *bus.Bus
+       dps     []DataPublisher
+       dss     []DataSubscriber
+}
+
+func (e Pipeline) Name() string {
+       return name
+}
+
+func (e *Pipeline) PreRun() error {
+       e.logger = logger.GetLogger(name)
+       var err error
+       e.dataBus = bus.NewBus()
+       for _, dp := range e.dps {
+               err = multierr.Append(err, dp.Pub(e.dataBus))
+       }
+       for _, ds := range e.dss {
+               err = multierr.Append(err, ds.Sub(e.dataBus))
+       }
+       return err
+}
+
+func (e *Pipeline) Register(component ...Component) {
+       for _, c := range component {
+               if ds, ok := c.(DataSubscriber); ok {
+                       e.dss = append(e.dss, ds)
+               }
+               if ps, ok := c.(DataPublisher); ok {
+                       e.dps = append(e.dps, ps)
+               }
+       }
+}
diff --git a/go.mod b/go.mod
index fd684ee..0927f71 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,11 @@ module github.com/apache/skywalking-banyandb
 go 1.16
 
 require (
+       github.com/oklog/run v1.1.0
        github.com/spf13/cobra v1.1.3
+       github.com/spf13/pflag v1.0.5
        github.com/spf13/viper v1.7.1
        github.com/stretchr/testify v1.4.0
-       go.uber.org/atomic v1.7.0
        go.uber.org/multierr v1.6.0
        go.uber.org/zap v1.16.0
 )
diff --git a/go.sum b/go.sum
index 5d102e4..e426a98 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod 
h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.1/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod 
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
+github.com/oklog/run v1.1.0/go.mod 
h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/oklog/ulid v1.3.1/go.mod 
h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod 
h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0 
h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1429a5a..8616612 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -25,31 +25,7 @@ import (
 // Logger is wrapper for zap logger with module, it is singleton.
 type Logger struct {
        module string
-       logger *zap.Logger
-}
-
-// Debug logs a message at DebugLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Debug(msg string, fields ...zap.Field) {
-       l.logger.Debug(msg, fields...)
-}
-
-// Info logs a message at InfoLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Info(msg string, fields ...zap.Field) {
-       l.logger.Info(msg, fields...)
-}
-
-// Warn logs a message at WarnLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Warn(msg string, fields ...zap.Field) {
-       l.logger.Warn(msg, fields...)
-}
-
-// Error logs a message at ErrorLevel. The message includes any fields passed
-// at the log site, as well as any fields accumulated on the logger.
-func (l *Logger) Error(msg string, fields ...zap.Field) {
-       l.logger.Error(msg, fields...)
+       *zap.Logger
 }
 
 // String constructs a field with the given key and value.
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 5fb7dbd..bae8827 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -38,7 +38,7 @@ func GetLogger(scope ...string) *Logger {
                return root
        }
        module := strings.Join(scope, ".")
-       return &Logger{module: module, logger: root.logger.Named(module)}
+       return &Logger{module: module, Logger: root.Logger.Named(module)}
 }
 
 // InitLogger initializes a zap logger from user config
@@ -73,5 +73,5 @@ func getLogger(cfg config.Logging) (*Logger, error) {
        if err != nil {
                return nil, err
        }
-       return &Logger{module: "root", logger: l}, nil
+       return &Logger{module: "root", Logger: l}, nil
 }
diff --git a/pkg/logger/setting_test.go b/pkg/logger/setting_test.go
index d065af2..784ef4a 100644
--- a/pkg/logger/setting_test.go
+++ b/pkg/logger/setting_test.go
@@ -81,10 +81,10 @@ func TestInitLogger(t *testing.T) {
                        }
                        if err == nil {
                                assert.NotNil(t, logger)
-                               assert.NotNil(t, logger.logger)
+                               assert.NotNil(t, logger.Logger)
                                assert.NotEmpty(t, logger.module)
-                               assert.Equal(t, tt.want.isDev, 
reflect.ValueOf(*logger.logger).FieldByName("development").Bool())
-                               assert.NotNil(t, 
logger.logger.Check(tt.want.level, "foo"))
+                               assert.Equal(t, tt.want.isDev, 
reflect.ValueOf(*logger.Logger).FieldByName("development").Bool())
+                               assert.NotNil(t, 
logger.Logger.Check(tt.want.level, "foo"))
                        }
                })
        }
diff --git a/pkg/run/run.go b/pkg/run/run.go
new file mode 100644
index 0000000..ccee525
--- /dev/null
+++ b/pkg/run/run.go
@@ -0,0 +1,391 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package run
+
+import (
+       "fmt"
+       "os"
+       "path"
+
+       "github.com/oklog/run"
+       "github.com/spf13/pflag"
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// FlagSet holds a pflag.FlagSet as well as an exported Name variable for
+// allowing improved help usage information.
+type FlagSet struct {
+       *pflag.FlagSet
+       Name string
+}
+
+// NewFlagSet returns a new FlagSet for usage in Config objects.
+func NewFlagSet(name string) *FlagSet {
+       return &FlagSet{
+               FlagSet: pflag.NewFlagSet(name, pflag.ContinueOnError),
+               Name:    name,
+       }
+}
+
+// Unit is the default interface an object needs to implement for it to be able
+// to register with a Group.
+// Name should return a short but good identifier of the Unit.
+type Unit interface {
+       Name() string
+}
+
+// Config interface should be implemented by Group Unit objects that manage
+// their own configuration through the use of flags.
+// If a Unit's Validate returns an error it will stop the Group immediately.
+type Config interface {
+       //Unit for Group registration and identification
+       Unit
+       //FlagSet returns an object's FlagSet
+       FlagSet() *FlagSet
+       //Validate checks an object's stored values
+       Validate() error
+}
+
+// PreRunner interface should be implemented by Group Unit objects that need
+// a pre run stage before starting the Group Services.
+// If a Unit's PreRun returns an error it will stop the Group immediately.
+type PreRunner interface {
+       //Unit for Group registration and identification
+       Unit
+       PreRun() error
+}
+
+// NewPreRunner takes a name and a standalone pre runner compatible function
+// and turns them into a Group compatible PreRunner, ready for registration.
+func NewPreRunner(name string, fn func() error) PreRunner {
+       return preRunner{name: name, fn: fn}
+}
+
+type preRunner struct {
+       name string
+       fn   func() error
+}
+
+func (p preRunner) Name() string {
+       return p.name
+}
+
+func (p preRunner) PreRun() error {
+       return p.fn()
+}
+
+// Service interface should be implemented by Group Unit objects that need
+// to run a blocking service until an error occurs or a shutdown request is
+// made.
+// The Serve method must be blocking and return an error on unexpected 
shutdown.
+// Recoverable errors need to be handled inside the service itself.
+// GracefulStop must gracefully stop the service and make the Serve call 
return.
+//
+// Since Service is managed by Group, it is considered a design flaw to call 
any
+// of the Service methods directly in application code.
+type Service interface {
+       // Unit for Group registration and identification
+       Unit
+       // Serve starts the GroupService and blocks.
+       Serve() error
+       // GracefulStop shuts down and cleans up the GroupService.
+       GracefulStop()
+}
+
+// Group builds on https://github.com/oklog/run to provide a deterministic way
+// to manage service lifecycles. It allows for easy composition of elegant
+// monoliths as well as adding signal handlers, metrics services, etc.
+type Group struct {
+       // Name of the Group managed service. If omitted, the binaryname will be
+       // used as found at runtime.
+       Name string
+
+       f   *FlagSet
+       r   run.Group
+       c   []Config
+       p   []PreRunner
+       s   []Service
+       log *logger.Logger
+
+       configured   bool
+       hsRegistered bool
+}
+
+// Register will inspect the provided objects implementing the Unit interface 
to
+// see if it needs to register the objects for any of the Group bootstrap
+// phases. If a Unit doesn't satisfy any of the bootstrap phases it is ignored
+// by Group.
+// The returned array of booleans is of the same size as the amount of provided
+// Units, signalling for each provided Unit if it successfully registered with
+// Group for at least one of the bootstrap phases or if it was ignored.
+func (g *Group) Register(units ...Unit) []bool {
+       hasRegistered := make([]bool, len(units))
+       for idx := range units {
+               if !g.configured {
+                       // if RunConfig has been called we can no longer 
register Config
+                       // phases of Units
+                       if c, ok := units[idx].(Config); ok {
+                               g.c = append(g.c, c)
+                               hasRegistered[idx] = true
+                       }
+               }
+               if p, ok := units[idx].(PreRunner); ok {
+                       g.p = append(g.p, p)
+                       hasRegistered[idx] = true
+               }
+               if s, ok := units[idx].(Service); ok {
+                       g.s = append(g.s, s)
+                       hasRegistered[idx] = true
+               }
+       }
+       return hasRegistered
+}
+
+// RunConfig runs the Config phase of all registered Config aware Units.
+// Only use this function if needing to add additional wiring between config
+// and (pre)run phases and a separate PreRunner phase is not an option.
+// In most cases it is best to use the Run method directly as it will run the
+// Config phase prior to executing the PreRunner and Service phases.
+// If an error is returned the application must shut down as it is considered
+// fatal.
+func (g *Group) RunConfig(args ...string) (err error) {
+
+       g.configured = true
+
+       if g.Name == "" {
+               // use the binary name if custom name has not been provided
+               g.Name = path.Base(os.Args[0])
+       }
+
+       g.log = logger.GetLogger(g.Name)
+
+       defer func() {
+               if err != nil {
+                       g.log.Error("unexpected exit", logger.Error(err))
+               }
+       }()
+
+       // run configuration stage
+       g.f = NewFlagSet(g.Name)
+       g.f.SortFlags = false // keep order of flag registration
+       g.f.Usage = func() {
+               fmt.Printf("Flags:\n")
+               g.f.PrintDefaults()
+       }
+
+       // register default rungroup flags
+       var (
+               name         string
+               showVersion  bool
+               showRunGroup bool
+       )
+
+       gFS := NewFlagSet("Common Service options")
+       gFS.SortFlags = false
+       gFS.StringVarP(&name, "name", "n", g.Name, `name of this service`)
+       gFS.BoolVarP(&showVersion, "version", "v", false,
+               "show version information and exit.")
+       gFS.BoolVar(&showRunGroup, "show-rungroup-units", false, "show rungroup 
units")
+       g.f.AddFlagSet(gFS.FlagSet)
+
+       // default to os.Args if args parameter was omitted
+       if len(args) == 0 {
+               args = os.Args[1:]
+       }
+
+       // parse our rungroup flags only (not the plugin ones)
+       _ = gFS.Parse(args)
+       if name != "" {
+               g.Name = name
+       }
+
+       // register flags from attached Config objects
+       fs := make([]*FlagSet, len(g.c))
+       for idx := range g.c {
+               // a Namer might have been deregistered
+               if g.c[idx] == nil {
+                       continue
+               }
+               nameField := logger.String("name", g.c[idx].Name())
+               indexField := logger.Uint32("index", uint32(idx))
+               g.log.Debug("register flags", nameField, indexField,
+                       logger.Uint32("total", uint32(len(g.c))))
+               fs[idx] = g.c[idx].FlagSet()
+               if fs[idx] == nil {
+                       // no FlagSet returned
+                       g.log.Debug("config object did not return a flagset", 
nameField)
+                       continue
+               }
+               fs[idx].VisitAll(func(f *pflag.Flag) {
+                       if g.f.Lookup(f.Name) != nil {
+                               // log duplicate flag
+                               g.log.Warn("ignoring duplicate flag", 
logger.String("name", f.Name), indexField)
+                               return
+                       }
+                       g.f.AddFlag(f)
+               })
+       }
+
+       // parse FlagSet and exit on error
+       if err = g.f.Parse(args); err != nil {
+               return err
+       }
+
+       // bail early on help or version requests
+       switch {
+       case showRunGroup:
+               fmt.Println(g.ListUnits())
+               return nil
+       }
+
+       // Validate Config inputs
+       for idx := range g.c {
+               // a Config might have been deregistered during Run
+               indexField := logger.Uint32("index", uint32(idx))
+               if g.c[idx] == nil {
+                       g.log.Debug("skipping validate", indexField)
+                       continue
+               }
+               g.log.Debug("validate config: %s (%d/%d)", 
logger.String("name", g.c[idx].Name()), indexField,
+                       logger.Uint32("total", uint32(len(g.c))))
+               if vErr := g.c[idx].Validate(); vErr != nil {
+                       err = multierr.Append(err, vErr)
+               }
+       }
+
+       // exit on at least one Validate error
+       if err != nil {
+               return err
+       }
+
+       // log binary name and version
+       g.log.Info("started")
+
+       return nil
+}
+
+// Run will execute all phases of all registered Units and block until an error
+// occurs.
+// If RunConfig has been called prior to Run, the Group's Config phase will be
+// skipped and Run continues with the PreRunner and Service phases.
+//
+// The following phases are executed in the following sequence:
+//
+//   Config phase (serially, in order of Unit registration)
+//     - FlagSet()        Get & register all FlagSets from Config Units.
+//     - Flag Parsing     Using the provided args (os.Args if empty)
+//     - Validate()       Validate Config Units. Exit on first error.
+//
+//   PreRunner phase (serially, in order of Unit registration)
+//     - PreRun()         Execute PreRunner Units. Exit on first error.
+//
+//   Service phase (concurrently)
+//     - Serve()          Execute all Service Units in separate Go routines.
+//     - Wait             Block until one of the Serve() methods returns
+//     - GracefulStop()   Call interrupt handlers of all Service Units.
+//
+//   Run will return with the originating error on:
+//   - first Config.Validate()  returning an error
+//   - first PreRunner.PreRun() returning an error
+//   - first Service.Serve()    returning (error or nil)
+//
+func (g *Group) Run(args ...string) (err error) {
+       // run config registration and flag parsing stages
+       if err = g.RunConfig(args...); err != nil {
+               return err
+       }
+       defer func() {
+               if err != nil {
+                       g.log.Error("unexpected exit", logger.Error(err))
+               }
+       }()
+
+       // execute pre run stage and exit on error
+       for idx := range g.p {
+               // a PreRunner might have been deregistered during Run
+               if g.p[idx] == nil {
+                       continue
+               }
+               g.log.Debug("pre-run:", logger.String("name", g.p[idx].Name()),
+                       logger.Uint32("ran", uint32(idx+1)), 
logger.Uint32("total", uint32(len(g.p))))
+               if err := g.p[idx].PreRun(); err != nil {
+                       return err
+               }
+       }
+
+       // feed our registered services to our internal run.Group
+       for idx := range g.s {
+               // a Service might have been deregistered during Run
+               s := g.s[idx]
+               if s == nil {
+                       continue
+               }
+
+               g.log.Debug("serve:", logger.String("name", s.Name()),
+                       logger.Uint32("ran", uint32(idx+1)), 
logger.Uint32("total", uint32(len(g.s))))
+               g.r.Add(func() error {
+                       return s.Serve()
+               }, func(_ error) {
+                       g.log.Debug("stop:", logger.String("name", s.Name()),
+                               logger.Uint32("ran", uint32(idx+1)), 
logger.Uint32("total", uint32(len(g.s))))
+                       s.GracefulStop()
+               })
+       }
+
+       // start registered services and block
+       return g.r.Run()
+}
+
+// ListUnits returns a list of all Group phases and the Units registered to 
each
+// of them.
+func (g Group) ListUnits() string {
+       var (
+               s string
+               t = "cli"
+       )
+
+       if len(g.c) > 0 {
+               s += "\n- config: "
+               for _, u := range g.c {
+                       if u != nil {
+                               s += u.Name() + " "
+                       }
+               }
+       }
+       if len(g.p) > 0 {
+               s += "\n- prerun: "
+               for _, u := range g.p {
+                       if u != nil {
+                               s += u.Name() + " "
+                       }
+               }
+       }
+       if len(g.s) > 0 {
+               s += "\n- serve : "
+               for _, u := range g.s {
+                       if u != nil {
+                               t = "svc"
+                               s += u.Name() + " "
+                       }
+               }
+       }
+
+       return fmt.Sprintf("Group: %s [%s]%s", g.Name, t, s)
+}
diff --git a/pkg/signal/handler.go b/pkg/signal/handler.go
new file mode 100644
index 0000000..df3d7a3
--- /dev/null
+++ b/pkg/signal/handler.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package signal
+
+import (
+       "errors"
+       "fmt"
+       "os"
+       "os/signal"
+       "syscall"
+
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// ErrSignal is returned when a termination signal is received.
+var ErrSignal = errors.New("signal received")
+var _ run.Service = (*Handler)(nil)
+
+// Handler implements a unix signal handler as run.GroupService.
+type Handler struct {
+       signal chan os.Signal
+       cancel chan struct{}
+}
+
+func (h *Handler) Name() string {
+       return "signal"
+}
+
+// PreRun implements run.PreRunner to initialize the handler.
+func (h *Handler) PreRun() error {
+       h.cancel = make(chan struct{})
+       h.signal = make(chan os.Signal, 1)
+       signal.Notify(h.signal,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGTERM)
+       return nil
+}
+
+// Serve implements run.Service and listens for incoming unix signals.
+func (h *Handler) Serve() error {
+       for {
+               select {
+               case sig := <-h.signal:
+                       return fmt.Errorf("%s %w", sig, ErrSignal)
+               case <-h.cancel:
+                       signal.Stop(h.signal)
+                       close(h.signal)
+                       return nil
+               }
+       }
+}
+
+// GracefulStop implements run.GroupService and will close the signal handler.
+func (h *Handler) GracefulStop() {
+       close(h.cancel)
+}
diff --git a/pkg/version/version.go b/pkg/version/version.go
index 8ffaaf0..10dc5cb 100644
--- a/pkg/version/version.go
+++ b/pkg/version/version.go
@@ -19,6 +19,11 @@
 // git branches and tags into the binary importing this package.
 package version
 
+import (
+       "fmt"
+       "strings"
+)
+
 // build is to be populated at build time using -ldflags -X.
 var build string
 
@@ -26,3 +31,36 @@ var build string
 func Build() string {
        return build
 }
+
+// Show the service's version information
+func Show(serviceName string) {
+       fmt.Println(serviceName + " " + Parse())
+}
+
+// Parse returns the parsed service's version information. (from raw git label)
+func Parse() string {
+       // versionString syntax:
+       //   <release tag>-<commits since release tag>-g<commit hash>-<branch 
name>
+       v := strings.SplitN(build, "-", 4)
+       // prefix v on semantic versioning tags omitting it
+       // Go module tags should include the 'v'
+       if len(v[0]) > 1 && strings.ToLower(v[0])[0] != 'v' {
+               v[0] = "v" + v[0]
+       }
+       switch {
+       case len(v) != 4:
+               // built without using the make tooling
+               return "v0.0.0-unofficial"
+       case v[1] != "0":
+               // built from a non release commit point
+               // In the version string, the commit tag is prefixed with "-g" 
(which stands for "git").
+               // When printing the version string, remove that prefix to just 
show the real commit hash.
+               return fmt.Sprintf("%s-%s (%s, +%s)", v[0], v[3], v[2][1:], 
v[1])
+       case v[3] != "master":
+               // specific branch release build
+               return fmt.Sprintf("%s-%s", v[0], v[3])
+       default:
+               // master release build
+               return v[0]
+       }
+}

Reply via email to