This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lint in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c3adea8356fafa4e4c01a443980b619d08373e25 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Dec 2 07:10:42 2022 +0000 Fix lint issues in banyand Signed-off-by: Gao Hongtao <[email protected]> --- banyand/cmd/server/main.go | 1 + banyand/discovery/discovery.go | 4 ++++ banyand/doc.go | 1 + banyand/internal/cmd/root.go | 1 + banyand/internal/cmd/standalone.go | 2 +- banyand/kv/badger.go | 21 +++++++--------- banyand/kv/kv.go | 40 ++++++++++++++++++------------- banyand/liaison/grpc/discovery.go | 6 ++--- banyand/liaison/grpc/measure.go | 9 +++---- banyand/liaison/grpc/registry.go | 2 +- banyand/liaison/grpc/server.go | 34 +++++++++++++------------- banyand/liaison/grpc/stream.go | 7 ++---- banyand/liaison/http/health.go | 4 ++-- banyand/liaison/http/server.go | 14 +++++------ banyand/liaison/liaison.go | 10 +++----- banyand/measure/field_flag_test.go | 1 + banyand/measure/measure.go | 7 +++--- banyand/measure/measure_query.go | 6 +++-- banyand/measure/measure_topn.go | 10 ++++---- banyand/measure/measure_write.go | 10 ++++---- banyand/measure/metadata.go | 2 +- banyand/measure/service.go | 6 +++-- banyand/metadata/metadata.go | 5 ++++ banyand/observability/metric.go | 2 ++ banyand/observability/pprof.go | 1 + banyand/observability/type.go | 4 ++++ banyand/query/processor.go | 3 ++- banyand/query/query.go | 10 ++++---- banyand/queue/local.go | 2 +- banyand/queue/queue.go | 3 +++ banyand/stream/metadata.go | 2 +- banyand/stream/service.go | 9 +++---- banyand/stream/stream.go | 4 +++- banyand/stream/stream_query.go | 6 +++-- banyand/stream/stream_write.go | 6 ++--- banyand/tsdb/block.go | 12 +++++----- banyand/tsdb/block_ctrl.go | 10 ++++---- banyand/tsdb/bucket/bucket.go | 23 +++++++++++------- banyand/tsdb/bucket/queue.go | 18 +++++++++----- banyand/tsdb/bucket/strategy.go | 33 +++++++++++++++---------- banyand/tsdb/index/writer.go | 16 ++++++------- banyand/tsdb/indexdb.go | 14 ++++++----- banyand/tsdb/metric.go | 1 + banyand/tsdb/retention.go | 2 +- banyand/tsdb/scope.go | 21 ++++++++-------- banyand/tsdb/segment.go | 2 +- banyand/tsdb/segment_ctrl.go | 8 +++---- banyand/tsdb/series.go | 22 ++++++++++------- banyand/tsdb/series_seek.go | 4 ++++ banyand/tsdb/series_seek_filter.go | 6 ++--- banyand/tsdb/series_seek_sort.go | 4 ++-- banyand/tsdb/series_write.go | 21 ++++++++-------- banyand/tsdb/seriesdb.go | 48 +++++++++++++++++++++++++++++-------- banyand/tsdb/seriesdb_test.go | 2 +- banyand/tsdb/shard.go | 22 +++++++---------- banyand/tsdb/tsdb.go | 49 +++++++++++++++++++++++++++----------- pkg/schema/metadata.go | 4 ++-- pkg/test/setup/setup.go | 2 +- 58 files changed, 353 insertions(+), 246 deletions(-) diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go index 6a2568c..8906f84 100644 --- a/banyand/cmd/server/main.go +++ b/banyand/cmd/server/main.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package main implements the executable banyandb server named banyand. package main import ( diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go index 34c4cf8..daacd59 100644 --- a/banyand/discovery/discovery.go +++ b/banyand/discovery/discovery.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package discovery implements the service discovery. package discovery import ( @@ -24,6 +25,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +// ServiceRepo provides service subscripting and publishing. +// //go:generate mockgen -destination=./discovery_mock.go -package=discovery github.com/apache/skywalking-banyandb/banyand/discovery ServiceRepo type ServiceRepo interface { NodeID() string @@ -52,6 +55,7 @@ func (r *repo) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, err return r.local.Publish(topic, message...) } +// NewServiceRepo return a new ServiceRepo. func NewServiceRepo(_ context.Context) (ServiceRepo, error) { return &repo{ local: bus.NewBus(), diff --git a/banyand/doc.go b/banyand/doc.go index 94f961d..ab85caf 100644 --- a/banyand/doc.go +++ b/banyand/doc.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package banyand implements a executable database server. package banyand /* diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index 01cc159..c5dd966 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -33,6 +33,7 @@ const logo = ` ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═════╝ ╚═════╝ ` +// NewRoot returns a root command. func NewRoot() *cobra.Command { cmd := &cobra.Command{ DisableAutoGenTag: true, diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 020fb51..2f3b2ad 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -65,7 +65,7 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewExecutor(ctx, streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go index 584a664..6fb9c0b 100644 --- a/banyand/kv/badger.go +++ b/banyand/kv/badger.go @@ -34,12 +34,13 @@ import ( ) var ( - _ Store = (*badgerDB)(nil) - _ IndexStore = (*badgerDB)(nil) - _ y.Iterator = (*mergedIter)(nil) - _ TimeSeriesStore = (*badgerTSS)(nil) - bitMergeEntry byte = 1 << 3 - ErrKeyNotFound = badger.ErrKeyNotFound + _ Store = (*badgerDB)(nil) + _ IndexStore = (*badgerDB)(nil) + _ y.Iterator = (*mergedIter)(nil) + _ TimeSeriesStore = (*badgerTSS)(nil) + bitMergeEntry byte = 1 << 3 + // ErrKeyNotFound denotes the expected key can not be got from the kv service. + ErrKeyNotFound = badger.ErrKeyNotFound ) type badgerTSS struct { @@ -147,12 +148,6 @@ func (b *badgerDB) Stats() observability.Statistics { return badgerStats(b.db) } -func (b *badgerDB) Handover(iterator Iterator) error { - return b.db.HandoverIterator(&mergedIter{ - delegated: iterator, - }) -} - func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = opt.PrefetchSize @@ -173,7 +168,7 @@ func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error err := f(b.shardID, k, func() ([]byte, error) { return y.Copy(it.Value().Value), nil }) - if errors.Is(err, ErrStopScan) { + if errors.Is(err, errStopScan) { break } if err != nil { diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go index f180d55..fb3960a 100644 --- a/banyand/kv/kv.go +++ b/banyand/kv/kv.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package kv implements a key-value engine. package kv import ( @@ -31,21 +32,25 @@ import ( ) var ( - ErrStopScan = errors.New("stop scanning") + errStopScan = errors.New("stop scanning") + + // DefaultScanOpts is a helper to provides canonical options for scanning. DefaultScanOpts = ScanOpts{ PrefetchSize: 100, PrefetchValues: true, } ) -type Writer interface { +type writer interface { // Put a value Put(key, val []byte) error PutWithVersion(key, val []byte, version uint64) error } +// ScanFunc is the closure executed on scanning out a pair of key-value. type ScanFunc func(shardID int, key []byte, getVal func() ([]byte, error)) error +// ScanOpts wraps options for scanning the kv storage. type ScanOpts struct { Prefix []byte PrefetchSize int @@ -53,6 +58,7 @@ type ScanOpts struct { Reverse bool } +// Reader allows retrieving data from kv. type Reader interface { Iterable // Get a value by its key @@ -65,10 +71,11 @@ type Reader interface { type Store interface { observability.Observable io.Closer - Writer + writer Reader } +// TimeSeriesWriter allows writing to a time-series storage. type TimeSeriesWriter interface { // Put a value with a timestamp/version Put(key, val []byte, ts uint64) error @@ -77,6 +84,7 @@ type TimeSeriesWriter interface { PutAsync(key, val []byte, ts uint64, f func(error)) error } +// TimeSeriesReader allows retrieving data from a time-series storage. type TimeSeriesReader interface { // Get a value by its key and timestamp/version Get(key []byte, ts uint64) ([]byte, error) @@ -91,6 +99,7 @@ type TimeSeriesStore interface { TimeSeriesReader } +// TimeSeriesOptions sets an options for creating a TimeSeriesStore. type TimeSeriesOptions func(TimeSeriesStore) // TSSWithLogger sets a external logger into underlying TimeSeriesStore. @@ -104,6 +113,7 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions { } } +// TSSWithEncoding sets encoding and decoding pools for flushing and compacting. func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encoding.SeriesDecoderPool) TimeSeriesOptions { return func(store TimeSeriesStore) { if btss, ok := store.(*badgerTSS); ok { @@ -117,25 +127,20 @@ func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encodin } } -func TSSWithFlushCallback(callback func()) TimeSeriesOptions { +// TSSWithMemTableSize sets the size of memory table in bytes. +func TSSWithMemTableSize(sizeInBytes int64) TimeSeriesOptions { return func(store TimeSeriesStore) { - if btss, ok := store.(*badgerTSS); ok { - btss.dbOpts.FlushCallBack = callback - } - } -} - -func TSSWithMemTableSize(size int64) TimeSeriesOptions { - return func(store TimeSeriesStore) { - if size < 1 { + if sizeInBytes < 1 { return } if btss, ok := store.(*badgerTSS); ok { - btss.dbOpts.MemTableSize = size + btss.dbOpts.MemTableSize = sizeInBytes } } } +// Iterator allows iterating the kv tables. +// TODO: use generic to provide a unique iterator type Iterator interface { Next() Rewind() @@ -147,17 +152,16 @@ type Iterator interface { Close() error } +// Iterable allows creating a Iterator. type Iterable interface { NewIterator(opt ScanOpts) Iterator } -type HandoverCallback func() - +// IndexStore allows writing and reading index format data. type IndexStore interface { observability.Observable Iterable Reader - Handover(iterator Iterator) error Close() error } @@ -184,6 +188,7 @@ func OpenTimeSeriesStore(shardID int, path string, options ...TimeSeriesOptions) return btss, nil } +// StoreOptions sets options for creating Store. type StoreOptions func(Store) // StoreWithLogger sets a external logger into underlying Store. @@ -236,6 +241,7 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) return bdb, nil } +// IndexOptions sets options for creating the index store. type IndexOptions func(store IndexStore) // IndexWithLogger sets a external logger into underlying IndexStore. diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index cbff00e..bb9f66e 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -33,7 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/partition" ) -var ErrNotExist = errors.New("the object doesn't exist") +var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { shardRepo *shardRepo @@ -61,11 +61,11 @@ func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies [] Name: metadata.Group, })) if !existed { - return nil, nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the shard num by: %v", metadata) + return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the shard num by: %v", metadata) } locator, existed := ds.entityRepo.getLocator(getID(metadata)) if !existed { - return nil, nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the locator by: %v", metadata) + return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, "finding the locator by: %v", metadata) } return locator.Locate(metadata.Name, tagFamilies, shardNum) } diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index be38a38..a0c8d0b 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -43,10 +43,7 @@ type measureService struct { func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error { reply := func() error { - if err := measure.Send(&measurev1.WriteResponse{}); err != nil { - return err - } - return nil + return measure.Send(&measurev1.WriteResponse{}) } sampled := ms.log.Sample(&zerolog.BasicSampler{N: 10}) for { @@ -118,7 +115,7 @@ func (ms *measureService) Query(_ context.Context, req *measurev1.QueryRequest) case []*measurev1.DataPoint: return &measurev1.QueryResponse{DataPoints: d}, nil case common.Error: - return nil, errors.WithMessage(ErrQueryMsg, d.Msg()) + return nil, errors.WithMessage(errQueryMsg, d.Msg()) } return nil, nil } @@ -142,7 +139,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq case []*measurev1.TopNList: return &measurev1.TopNResponse{Lists: d}, nil case common.Error: - return nil, errors.WithMessage(ErrQueryMsg, d.Msg()) + return nil, errors.WithMessage(errQueryMsg, d.Msg()) } return nil, nil } diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index 767cb6b..87927e7 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -400,7 +400,7 @@ func (rs *groupRegistryServer) Get(ctx context.Context, req *databasev1.GroupReg }, nil } -func (rs *groupRegistryServer) List(ctx context.Context, req *databasev1.GroupRegistryServiceListRequest) ( +func (rs *groupRegistryServer) List(ctx context.Context, _ *databasev1.GroupRegistryServiceListRequest) ( *databasev1.GroupRegistryServiceListResponse, error, ) { groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx) diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 4aa66ba..5a58d80 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package grpc implements the gRPC services defined by APIs. package grpc import ( @@ -45,13 +46,13 @@ import ( const defaultRecvSize = 1024 * 1024 * 10 var ( - ErrServerCert = errors.New("invalid server cert file") - ErrServerKey = errors.New("invalid server key file") - ErrNoAddr = errors.New("no address") - ErrQueryMsg = errors.New("invalid query message") + errServerCert = errors.New("invalid server cert file") + errServerKey = errors.New("invalid server key file") + errNoAddr = errors.New("no address") + errQueryMsg = errors.New("invalid query message") ) -type Server struct { +type server struct { pipeline queue.Queue creds credentials.TransportCredentials repo discovery.ServiceRepo @@ -74,8 +75,9 @@ type Server struct { tls bool } -func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) *Server { - return &Server{ +// NewServer returns a new gRPC server. +func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit { + return &server{ pipeline: pipeline, repo: repo, streamSVC: &streamService{ @@ -108,7 +110,7 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe } } -func (s *Server) PreRun() error { +func (s *server) PreRun() error { s.log = logger.GetLogger("liaison-grpc") components := []struct { discoverySVC *discoveryService @@ -140,11 +142,11 @@ func (s *Server) PreRun() error { return nil } -func (s *Server) Name() string { +func (s *server) Name() string { return "grpc" } -func (s *Server) FlagSet() *run.FlagSet { +func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") fs.IntVarP(&s.maxRecvMsgSize, "max-recv-msg-size", "", defaultRecvSize, "the size of max receiving message") fs.BoolVarP(&s.tls, "tls", "", false, "connection uses TLS if true, else plain TCP") @@ -154,18 +156,18 @@ func (s *Server) FlagSet() *run.FlagSet { return fs } -func (s *Server) Validate() error { +func (s *server) Validate() error { if s.addr == "" { - return ErrNoAddr + return errNoAddr } if !s.tls { return nil } if s.certFile == "" { - return ErrServerCert + return errServerCert } if s.keyFile == "" { - return ErrServerKey + return errServerKey } creds, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile) if errTLS != nil { @@ -175,7 +177,7 @@ func (s *Server) Validate() error { return nil } -func (s *Server) Serve() run.StopNotify { +func (s *server) Serve() run.StopNotify { var opts []grpclib.ServerOption if s.tls { opts = []grpclib.ServerOption{grpclib.Creds(s.creds)} @@ -215,7 +217,7 @@ func (s *Server) Serve() run.StopNotify { return s.stopCh } -func (s *Server) GracefulStop() { +func (s *server) GracefulStop() { s.log.Info().Msg("stopping") stopped := make(chan struct{}) go func() { diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 191df78..f70e188 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -43,10 +43,7 @@ type streamService struct { func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { reply := func() error { - if err := stream.Send(&streamv1.WriteResponse{}); err != nil { - return err - } - return nil + return stream.Send(&streamv1.WriteResponse{}) } sampled := s.log.Sample(&zerolog.BasicSampler{N: 10}) for { @@ -127,7 +124,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (*s case []*streamv1.Element: return &streamv1.QueryResponse{Elements: d}, nil case common.Error: - return nil, errors.WithMessage(ErrQueryMsg, d.Msg()) + return nil, errors.WithMessage(errQueryMsg, d.Msg()) } return nil, nil } diff --git a/banyand/liaison/http/health.go b/banyand/liaison/http/health.go index 7681e21..51f6272 100644 --- a/banyand/liaison/http/health.go +++ b/banyand/liaison/http/health.go @@ -56,7 +56,7 @@ type healthCheckClient struct { conn *grpc.ClientConn } -func (g *healthCheckClient) Check(ctx context.Context, r *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { +func (g *healthCheckClient) Check(ctx context.Context, _ *grpc_health_v1.HealthCheckRequest, _ ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { var resp *grpc_health_v1.HealthCheckResponse if err := grpchelper.Request(ctx, 10*time.Second, func(rpcCtx context.Context) (err error) { resp, err = grpc_health_v1.NewHealthClient(g.conn).Check(rpcCtx, @@ -70,6 +70,6 @@ func (g *healthCheckClient) Check(ctx context.Context, r *grpc_health_v1.HealthC return resp, nil } -func (g *healthCheckClient) Watch(ctx context.Context, r *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (grpc_health_v1.Health_WatchClient, error) { +func (g *healthCheckClient) Watch(_ context.Context, _ *grpc_health_v1.HealthCheckRequest, _ ...grpc.CallOption) (grpc_health_v1.Health_WatchClient, error) { return nil, status.Error(codes.Unimplemented, "unimplemented") } diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 4f51e25..6008bc9 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package http implements the gRPC gateway. package http import ( @@ -40,14 +41,13 @@ import ( "github.com/apache/skywalking-banyandb/ui" ) -type ServiceRepo interface { - run.Config - run.Service -} - -var _ ServiceRepo = (*service)(nil) +var ( + _ run.Config = (*service)(nil) + _ run.Service = (*service)(nil) +) -func NewService() ServiceRepo { +// NewService return a http service. +func NewService() run.Unit { return &service{ stopCh: make(chan struct{}), } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index e5a19dc..bb59f22 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package liaison implements a transmission layer between a data layer and a client. package liaison import ( @@ -27,12 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) -type Endpoint interface { - run.Config - run.PreRunner - run.Service -} - -func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) (Endpoint, error) { +// NewEndpoint return a new endpoint which is the entry point for the database server. +func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) (run.Unit, error) { return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil } diff --git a/banyand/measure/field_flag_test.go b/banyand/measure/field_flag_test.go index ae0845f..c47e457 100644 --- a/banyand/measure/field_flag_test.go +++ b/banyand/measure/field_flag_test.go @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package measure import ( diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 0cc528f..4a2e3be 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -15,14 +15,15 @@ // specific language governing permissions and limitations // under the License. +// Package measures implements a time-series-based storage which is consists of a sequence of data points. +// Each data point contains tags and fields. They arrive in a fixed interval. A data point could be updated +// by one with the identical entity(series_id) and timestamp. package measure import ( "context" "time" - "go.uber.org/multierr" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" @@ -74,7 +75,7 @@ func (s *measure) EntityLocator() partition.EntityLocator { } func (s *measure) Close() error { - return multierr.Combine(s.processorManager.Close(), s.indexWriter.Close()) + return s.processorManager.Close() } func (s *measure) parseSpec() (err error) { diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go index 1220e73..17ea882 100644 --- a/banyand/measure/measure_query.go +++ b/banyand/measure/measure_query.go @@ -35,13 +35,15 @@ import ( resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" ) -var ErrTagFamilyNotExist = errors.New("tag family doesn't exist") +var errTagFamilyNotExist = errors.New("tag family doesn't exist") +// Query allow to retrieve measure data points. type Query interface { LoadGroup(name string) (resourceSchema.Group, bool) Measure(measure *commonv1.Metadata) (Measure, error) } +// Measure allows inspecting measure data points' details. type Measure interface { io.Closer Shards(entity tsdb.Entity) ([]tsdb.Shard, error) @@ -135,7 +137,7 @@ func (s *measure) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFam } } if tagSpec == nil { - return nil, ErrTagFamilyNotExist + return nil, errTagFamilyNotExist } for i, tag := range tagFamily.GetTags() { tags[i] = &modelv1.Tag{ diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go index e5ad23d..4d51099 100644 --- a/banyand/measure/measure_topn.go +++ b/banyand/measure/measure_topn.go @@ -47,7 +47,8 @@ import ( const ( timeBucketFormat = "200601021504" - TopNTagFamily = "__topN__" + // TopNTagFamily is the identity of a tag family which contains the topN calculated result. + TopNTagFamily = "__topN__" ) var ( @@ -57,6 +58,7 @@ var ( errUnsupportedConditionValueType = errors.New("unsupported value type in the condition") + // TopNValueFieldSpec denotes the field specification of the topN calculated result. TopNValueFieldSpec = &databasev1.FieldSpec{ Name: "value", FieldType: databasev1.FieldType_FIELD_TYPE_INT, @@ -109,7 +111,7 @@ func (t *topNStreamingProcessor) run(ctx context.Context) { // Teardown is called by the Flow as a lifecycle hook. // So we should not block on err channel within this method. -func (t *topNStreamingProcessor) Teardown(ctx context.Context) error { +func (t *topNStreamingProcessor) Teardown(_ context.Context) error { t.Wait() return nil } @@ -280,13 +282,13 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor { streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 { return record.Data().(flow.Data)[1].(int64) }), - OrderBy(t.topNSchema.GetFieldValueSort()), + orderBy(t.topNSchema.GetFieldValueSort()), ).To(t).Open() go t.handleError() return t } -func OrderBy(sort modelv1.Sort) streaming.TopNOption { +func orderBy(sort modelv1.Sort) streaming.TopNOption { if sort == modelv1.Sort_SORT_ASC { return streaming.OrderBy(streaming.ASC) } diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index ff26fc2..a29c535 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -37,7 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -var ErrMalformedElement = errors.New("element is malformed") +var errMalformedElement = errors.New("element is malformed") func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *measurev1.DataPointValue) error { t := value.GetTimestamp().AsTime().Local() @@ -47,10 +47,10 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity [] sm := s.schema fLen := len(value.GetTagFamilies()) if fLen < 1 { - return errors.Wrap(ErrMalformedElement, "no tag family") + return errors.Wrap(errMalformedElement, "no tag family") } if fLen > len(sm.TagFamilies) { - return errors.Wrap(ErrMalformedElement, "tag family number is more than expected") + return errors.Wrap(errMalformedElement, "tag family number is more than expected") } shard, err := s.databaseSupplier.SupplyTSDB().Shard(shardID) if err != nil { @@ -80,7 +80,7 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity [] builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb) } if len(value.GetFields()) > len(sm.GetFields()) { - return nil, errors.Wrap(ErrMalformedElement, "fields number is more than expected") + return nil, errors.Wrap(errMalformedElement, "fields number is more than expected") } for fi, fieldValue := range value.GetFields() { fieldSpec := sm.GetFields()[fi] @@ -90,7 +90,7 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity [] continue } if fType != fieldSpec.GetFieldType() { - return nil, errors.Wrapf(ErrMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) + return nil, errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) } data := encodeFieldValue(fieldValue) if data == nil { diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 88aa56c..06b1f97 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -209,7 +209,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource }, s.l) } -func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { +func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return s.metadata.MeasureRegistry().GetMeasure(ctx, md) diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 6b15ede..60e615d 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -38,10 +38,12 @@ import ( ) var ( - ErrEmptyRootPath = errors.New("root path is empty") + errEmptyRootPath = errors.New("root path is empty") + // ErrMeasureNotExist denotes a measure doesn't exist in the metadata repo. ErrMeasureNotExist = errors.New("measure doesn't exist") ) +// Service allows inspecting the measure data points. type Service interface { run.PreRunner run.Config @@ -85,7 +87,7 @@ func (s *service) FlagSet() *run.FlagSet { func (s *service) Validate() error { if s.root == "" { - return ErrEmptyRootPath + return errEmptyRootPath } return nil } diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index acaf677..44f5953 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// Package metadata implements a Raft-based distributed metadata storage system. +// Powered by etcd. package metadata import ( @@ -39,6 +41,7 @@ type IndexFilter interface { Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) } +// Repo is the facade to interact with the metadata repository. type Repo interface { IndexFilter StreamRegistry() schema.Stream @@ -50,6 +53,7 @@ type Repo interface { PropertyRegistry() schema.Property } +// Service is the metadata repository. type Service interface { Repo run.PreRunner @@ -101,6 +105,7 @@ func (s *service) GracefulStop() { <-s.schemaRegistry.StopNotify() } +// NewService returns a new metadata repository Service. func NewService(_ context.Context) (Service, error) { return &service{}, nil } diff --git a/banyand/observability/metric.go b/banyand/observability/metric.go index b4598c6..9430237 100644 --- a/banyand/observability/metric.go +++ b/banyand/observability/metric.go @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package observability import ( @@ -31,6 +32,7 @@ var ( _ run.Config = (*metricService)(nil) ) +// NewMetricService returns a metric service. func NewMetricService() run.Service { return &metricService{ closer: run.NewCloser(1), diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go index 1340f9b..0e65136 100644 --- a/banyand/observability/pprof.go +++ b/banyand/observability/pprof.go @@ -31,6 +31,7 @@ var ( _ run.Config = (*metricService)(nil) ) +// NewProfService returns a pprof service. func NewProfService() run.Service { return &pprofService{ closer: run.NewCloser(1), diff --git a/banyand/observability/type.go b/banyand/observability/type.go index f6829d1..d6e4b09 100644 --- a/banyand/observability/type.go +++ b/banyand/observability/type.go @@ -14,17 +14,21 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +// Package observability provides metrics, profiling, and etc. package observability import "errors" var errNoAddr = errors.New("no address") +// Statistics represents a sample of a module. type Statistics struct { MemBytes int64 MaxMemBytes int64 } +// Observable allows sampling. type Observable interface { Stats() Statistics } diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 43100f2..6171b1a 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -37,6 +37,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/executor" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" + "github.com/apache/skywalking-banyandb/pkg/run" ) const ( @@ -44,7 +45,7 @@ const ( ) var ( - _ Executor = (*queryService)(nil) + _ run.PreRunner = (*queryService)(nil) _ bus.MessageListener = (*streamQueryProcessor)(nil) _ bus.MessageListener = (*measureQueryProcessor)(nil) _ bus.MessageListener = (*topNQueryProcessor)(nil) diff --git a/banyand/query/query.go b/banyand/query/query.go index 267a922..3e4eab1 100644 --- a/banyand/query/query.go +++ b/banyand/query/query.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package query implement the query module for liaison and other modules to retrieve data. package query import ( @@ -28,13 +29,10 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) -type Executor interface { - run.PreRunner -} - -func NewExecutor(_ context.Context, streamService stream.Service, measureService measure.Service, +// NewService return a new query service. +func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, metaService metadata.Service, serviceRepo discovery.ServiceRepo, pipeline queue.Queue, -) (Executor, error) { +) (run.Unit, error) { svc := &queryService{ metaService: metaService, serviceRepo: serviceRepo, diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 1061cbf..ec4a95e 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// Package queue implements the data transmission queue +// Package queue implements the data transmission queue. package queue import ( diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 7941a9b..0946ecb 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -25,6 +25,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) +// Queue builds a data transmission tunnel between subscribers and publishers. +// //go:generate mockgen -destination=./queue_mock.go -package=queue github.com/apache/skywalking-banyandb/pkg/bus MessageListener type Queue interface { run.Unit @@ -33,6 +35,7 @@ type Queue interface { run.Service } +// NewQueue return a new Queue which relies on the discovery service. func NewQueue(_ context.Context, repo discovery.ServiceRepo) (Queue, error) { return &local{ repo: repo, diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 5b200d9..286a276 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -195,7 +195,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource }, s.l), nil } -func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { +func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return s.metadata.StreamRegistry().GetStream(ctx, md) diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 77c4989..ac95748 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -36,10 +36,11 @@ import ( ) var ( - ErrEmptyRootPath = errors.New("root path is empty") - ErrStreamNotExist = errors.New("stream doesn't exist") + errEmptyRootPath = errors.New("root path is empty") + errStreamNotExist = errors.New("stream doesn't exist") ) +// Service allows inspecting the stream elements. type Service interface { run.PreRunner run.Config @@ -64,7 +65,7 @@ type service struct { func (s *service) Stream(metadata *commonv1.Metadata) (Stream, error) { sm, ok := s.schemaRepo.loadStream(metadata) if !ok { - return nil, errors.WithStack(ErrStreamNotExist) + return nil, errors.WithStack(errStreamNotExist) } return sm, nil } @@ -80,7 +81,7 @@ func (s *service) FlagSet() *run.FlagSet { func (s *service) Validate() error { if s.root == "" { - return ErrEmptyRootPath + return errEmptyRootPath } return nil } diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 41f45a4..c8866cf 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// Package stream implements a time-series-based storage which is consists of a sequence of element. +// Each element drops in a arbitrary interval. They are immutable, can not be updated or overwritten. package stream import ( @@ -65,7 +67,7 @@ func (s *stream) EntityLocator() partition.EntityLocator { } func (s *stream) Close() error { - return s.indexWriter.Close() + return nil } func (s *stream) parseSpec() { diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go index 1f89be8..5b3ef7c 100644 --- a/banyand/stream/stream_query.go +++ b/banyand/stream/stream_query.go @@ -31,12 +31,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/partition" ) -var ErrTagFamilyNotExist = errors.New("tag family doesn't exist") +var errTagFamilyNotExist = errors.New("tag family doesn't exist") +// Query allow to retrieve elements in a series of streams. type Query interface { Stream(stream *commonv1.Metadata) (Stream, error) } +// Stream allows inspecting elements' details. type Stream interface { io.Closer Shards(entity tsdb.Entity) ([]tsdb.Shard, error) @@ -101,7 +103,7 @@ func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFami } } if tagSpec == nil { - return nil, ErrTagFamilyNotExist + return nil, errTagFamilyNotExist } for i, tag := range tagFamily.GetTags() { tags[i] = &modelv1.Tag{ diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 3a77da3..01bb656 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -36,7 +36,7 @@ import ( ) var ( - ErrMalformedElement = errors.New("element is malformed") + errMalformedElement = errors.New("element is malformed") writtenBytes *prometheus.CounterVec ) @@ -59,10 +59,10 @@ func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb. sm := s.schema fLen := len(value.GetTagFamilies()) if fLen < 1 { - return errors.Wrap(ErrMalformedElement, "no tag family") + return errors.Wrap(errMalformedElement, "no tag family") } if fLen > len(sm.TagFamilies) { - return errors.Wrap(ErrMalformedElement, "tag family number is more than expected") + return errors.Wrap(errMalformedElement, "tag family number is more than expected") } shard, err := s.db.SupplyTSDB().Shard(shardID) if err != nil { diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index d5c48cf..cdfd8f2 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -53,7 +53,7 @@ const ( defaultEnqueueTimeout = 500 * time.Millisecond ) -var ErrBlockClosingInterrupted = errors.New("interrupt to close the block") +var errBlockClosingInterrupted = errors.New("interrupt to close the block") type block struct { encodingMethod EncodingMethod @@ -189,9 +189,9 @@ func (b *block) open() (err error) { return nil } -func (b *block) delegate(ctx context.Context) (BlockDelegate, error) { +func (b *block) delegate(ctx context.Context) (blockDelegate, error) { if b.deleted.Load() { - return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is deleted", b) + return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b) } blockID := BlockID{ BlockID: b.blockID, @@ -276,7 +276,7 @@ func (b *block) close(ctx context.Context) (err error) { select { case <-ctx.Done(): stopWaiting.Store(true) - return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b) + return errors.Wrapf(errBlockClosingInterrupted, "block:%s", b) case <-ch: } b.closed.Store(true) @@ -313,7 +313,7 @@ func (b *block) stats() (names []string, stats []observability.Statistics) { return names, stats } -type BlockDelegate interface { +type blockDelegate interface { io.Closer contains(ts time.Time) bool write(key []byte, val []byte, ts time.Time) error @@ -330,7 +330,7 @@ type BlockDelegate interface { String() string } -var _ BlockDelegate = (*bDelegate)(nil) +var _ blockDelegate = (*bDelegate)(nil) type bDelegate struct { delegate *block diff --git a/banyand/tsdb/block_ctrl.go b/banyand/tsdb/block_ctrl.go index 475b218..b1071d3 100644 --- a/banyand/tsdb/block_ctrl.go +++ b/banyand/tsdb/block_ctrl.go @@ -93,7 +93,7 @@ func (bc *blockController) Next() (bucket.Reporter, error) { } b := c.(*block) - return bc.newHeadBlock(bc.blockSize.NextTime(b.Start)) + return bc.newHeadBlock(bc.blockSize.nextTime(b.Start)) } func (bc *blockController) newHeadBlock(now time.Time) (*block, error) { @@ -157,14 +157,14 @@ func (bc *blockController) Parse(value string) (time.Time, error) { panic("invalid interval unit") } -func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) { +func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) { bb := bc.search(func(b *block) bool { return b.Overlapping(timeRange) }) if bb == nil { return nil, nil } - dd := make([]BlockDelegate, len(bb)) + dd := make([]blockDelegate, len(bb)) for i, b := range bb { d, err := b.delegate(ctx) if err != nil { @@ -175,7 +175,7 @@ func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRan return dd, nil } -func (bc *blockController) get(ctx context.Context, blockID SectionID) (BlockDelegate, error) { +func (bc *blockController) get(ctx context.Context, blockID SectionID) (blockDelegate, error) { b := bc.getBlock(blockID) if b != nil { return b.delegate(ctx) @@ -251,7 +251,7 @@ func (bc *blockController) create(start time.Time) (*block, error) { next = s } } - stdEnd := bc.blockSize.NextTime(start) + stdEnd := bc.blockSize.nextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { end = next.Start diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go index 6bdb01f..e05a400 100644 --- a/banyand/tsdb/bucket/bucket.go +++ b/banyand/tsdb/bucket/bucket.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package bucket implements a rolling bucket system. package bucket import ( @@ -29,21 +30,25 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -var ErrReporterClosed = errors.New("reporter is closed") +var errReporterClosed = errors.New("reporter is closed") +// Controller defines the provider of a Reporter. type Controller interface { Current() (Reporter, error) Next() (Reporter, error) OnMove(prev, next Reporter) } +// Status is a sample of the Reporter's status. type Status struct { Capacity int Volume int } +// Channel reports the status of a Reporter. type Channel chan Status +// Reporter allows reporting status to its supervisor. type Reporter interface { // TODO: refactor Report to return a status. It's too complicated to return a channel Report() (Channel, error) @@ -51,15 +56,16 @@ type Reporter interface { } var ( - _ Reporter = (*dummyReporter)(nil) - _ Reporter = (*timeBasedReporter)(nil) - DummyReporter = &dummyReporter{} + _ Reporter = (*dummyReporter)(nil) + _ Reporter = (*timeBasedReporter)(nil) + // DummyReporter is a special Reporter to avoid nil errors. + DummyReporter = &dummyReporter{} ) type dummyReporter struct{} func (*dummyReporter) Report() (Channel, error) { - return nil, ErrReporterClosed + return nil, errReporterClosed } func (*dummyReporter) Stop() { @@ -77,6 +83,7 @@ type timeBasedReporter struct { name string } +// NewTimeBasedReporter returns a Reporter which sends report based on time. func NewTimeBasedReporter(name string, timeRange timestamp.TimeRange, clock timestamp.Clock, scheduler *timestamp.Scheduler) Reporter { if timeRange.End.Before(clock.Now()) { return DummyReporter @@ -93,11 +100,11 @@ func NewTimeBasedReporter(name string, timeRange timestamp.TimeRange, clock time func (tr *timeBasedReporter) Report() (Channel, error) { if tr.scheduler.Closed() { - return nil, ErrReporterClosed + return nil, errReporterClosed } now := tr.clock.Now() if now.After(tr.End) { - return nil, ErrReporterClosed + return nil, errReporterClosed } ch := make(Channel, 1) interval := tr.Duration() >> 4 @@ -137,7 +144,7 @@ func (tr *timeBasedReporter) Report() (Channel, error) { }); err != nil { close(ch) if errors.Is(err, timestamp.ErrSchedulerClosed) { - return nil, ErrReporterClosed + return nil, errReporterClosed } return nil, err } diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go index 18e900a..6cb1bfa 100644 --- a/banyand/tsdb/bucket/queue.go +++ b/banyand/tsdb/bucket/queue.go @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package bucket import ( @@ -31,10 +32,13 @@ import ( ) type ( - EvictFn func(ctx context.Context, id interface{}) error + // EvictFn is a closure executed on evicting an item. + EvictFn func(ctx context.Context, id interface{}) error + // OnAddRecentFn is a notifier on adding an item into the recent queue. OnAddRecentFn func() error ) +// Queue is a LRU queue. type Queue interface { Touch(id fmt.Stringer) bool Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error @@ -45,13 +49,14 @@ type Queue interface { } const ( - QueueName = "block-queue-cleanup" - DefaultRecentRatio = 0.25 + // QueueName is identity of the queue. + QueueName = "block-queue-cleanup" + defaultRecentRatio = 0.25 defaultEvictBatchSize = 10 ) -var ErrInvalidSize = errors.New("invalid size") +var errInvalidSize = errors.New("invalid size") type lruQueue struct { recent simplelru.LRUCache @@ -65,12 +70,13 @@ type lruQueue struct { lock sync.RWMutex } +// NewQueue return a Queue for blocks eviction. func NewQueue(l *logger.Logger, size int, maxSize int, scheduler *timestamp.Scheduler, evictFn EvictFn) (Queue, error) { if size <= 0 { - return nil, ErrInvalidSize + return nil, errInvalidSize } - recentSize := int(float64(size) * DefaultRecentRatio) + recentSize := int(float64(size) * defaultRecentRatio) evictSize := maxSize - size recent, err := simplelru.NewLRU(size, nil) diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go index dda86d8..31b601d 100644 --- a/banyand/tsdb/bucket/strategy.go +++ b/banyand/tsdb/bucket/strategy.go @@ -30,41 +30,48 @@ import ( ) var ( + // ErrInvalidParameter denotes input parameters are invalid. ErrInvalidParameter = errors.New("parameters are invalid") - ErrNoMoreBucket = errors.New("no more buckets") + // ErrNoMoreBucket denotes the bucket volume reaches the limitation. + ErrNoMoreBucket = errors.New("no more buckets") ) -type Ratio float64 +type ratio float64 +// Strategy controls Reporters with Controller's help. type Strategy struct { optionsErr error ctrl Controller current atomic.Value logger *logger.Logger closer *run.Closer - ratio Ratio + ratio ratio currentRatio uint64 } +// StrategyOptions sets how to create a Strategy. type StrategyOptions func(*Strategy) -func WithNextThreshold(ratio Ratio) StrategyOptions { +// WithNextThreshold sets a ratio to creat the next Reporter. +func WithNextThreshold(r ratio) StrategyOptions { return func(s *Strategy) { - if ratio > 1.0 { + if r > 1.0 { s.optionsErr = multierr.Append(s.optionsErr, - errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", ratio)) + errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", r)) return } - s.ratio = ratio + s.ratio = r } } +// WithLogger sets a logger.Logger. func WithLogger(logger *logger.Logger) StrategyOptions { return func(s *Strategy) { s.logger = logger } } +// NewStrategy returns a Strategy. func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) { if ctrl == nil { return nil, errors.Wrap(ErrInvalidParameter, "controller is absent") @@ -98,12 +105,13 @@ func (s *Strategy) resetCurrent() error { return nil } +// Run the Strategy in the background. func (s *Strategy) Run() { go func(s *Strategy) { defer s.closer.Done() for { c, err := s.current.Load().(Reporter).Report() - if errors.Is(err, ErrReporterClosed) { + if errors.Is(err, errReporterClosed) { return } if err != nil { @@ -138,9 +146,9 @@ func (s *Strategy) observe(c Channel) bool { if !more { return moreBucket } - ratio := Ratio(status.Volume) / Ratio(status.Capacity) - atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio))) - if ratio >= s.ratio && next == nil && moreBucket { + r := ratio(status.Volume) / ratio(status.Capacity) + atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(r))) + if r >= s.ratio && next == nil && moreBucket { n, err := s.ctrl.Next() if errors.Is(err, ErrNoMoreBucket) { moreBucket = false @@ -151,7 +159,7 @@ func (s *Strategy) observe(c Channel) bool { next = n } } - if ratio >= 1.0 { + if r >= 1.0 { s.ctrl.OnMove(s.current.Load().(Reporter), next) if next != nil { s.current.Store(next) @@ -164,6 +172,7 @@ func (s *Strategy) observe(c Channel) bool { } } +// Close the Strategy running in the background. func (s *Strategy) Close() { s.closer.CloseThenWait() } diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go index 5e98d04..7385466 100644 --- a/banyand/tsdb/index/writer.go +++ b/banyand/tsdb/index/writer.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +// Package index implements transferring data to indices. package index import ( @@ -35,8 +36,7 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) -type CallbackFn func() - +// Message wraps value and other info to generate relative indices. type Message struct { Value Value LocalWriter tsdb.Writer @@ -44,11 +44,13 @@ type Message struct { Scope tsdb.Entry } +// Value represents the input data for generating indices. type Value struct { Timestamp time.Time TagFamilies []*modelv1.TagFamilyForWrite } +// WriterOptions wrap all options to create an index writer. type WriterOptions struct { DB tsdb.Supplier Families []*databasev1.TagFamilySpec @@ -64,6 +66,7 @@ const ( tree ) +// Writer generates indices based on index rules. type Writer struct { db tsdb.Supplier l *logger.Logger @@ -72,6 +75,7 @@ type Writer struct { enableGlobalIndex bool } +// NewWriter returns a new Writer with WriterOptions. func NewWriter(ctx context.Context, options WriterOptions) *Writer { w := new(Writer) parentLogger := ctx.Value(logger.ContextKey) @@ -127,10 +131,6 @@ func (s *Writer) Write(m Message) { } } -func (s *Writer) Close() error { - return nil -} - // TODO: should listen to pipeline in a distributed cluster. func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error { collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error { @@ -225,12 +225,12 @@ func (s *Writer) writeLocalIndex(writer tsdb.Writer, value Value) (err error) { ) } -var ErrUnsupportedIndexType = errors.New("unsupported index type") +var errUnsupportedIndexType = errors.New("unsupported index type") func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val [][]byte, err error) { val = make([][]byte, 0) if len(ruleIndex.TagIndices) != 1 { - return nil, errors.WithMessagef(ErrUnsupportedIndexType, + return nil, errors.WithMessagef(errUnsupportedIndexType, "the index rule %s(%v) didn't support composited tags", ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags) } diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go index 797b111..cb82f88 100644 --- a/banyand/tsdb/indexdb.go +++ b/banyand/tsdb/indexdb.go @@ -30,16 +30,19 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" ) +// IndexDatabase allows stocking index data. type IndexDatabase interface { WriterBuilder() IndexWriterBuilder Seek(field index.Field) ([]GlobalItemID, error) } +// IndexWriter allows ingesting index data. type IndexWriter interface { WriteLSMIndex(field []index.Field) error WriteInvertedIndex(field []index.Field) error } +// IndexWriterBuilder is a helper to build IndexWriter. type IndexWriterBuilder interface { Scope(scope Entry) IndexWriterBuilder Time(ts time.Time) IndexWriterBuilder @@ -47,8 +50,6 @@ type IndexWriterBuilder interface { Build() (IndexWriter, error) } -type IndexSeekBuilder interface{} - var _ IndexDatabase = (*indexDB)(nil) type indexDB struct { @@ -65,7 +66,7 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) { for _, s := range i.segCtrl.segments() { err = s.globalIndex.GetAll(f, func(rawBytes []byte) error { id := &GlobalItemID{} - errUnMarshal := id.UnMarshal(rawBytes) + errUnMarshal := id.unMarshal(rawBytes) if errUnMarshal != nil { return errUnMarshal } @@ -120,7 +121,7 @@ func (i *indexWriterBuilder) Build() (IndexWriter, error) { return nil, err } if i.globalItemID == nil { - return nil, errors.WithStack(ErrNoVal) + return nil, errors.WithStack(errNoVal) } return &indexWriter{ scope: i.scope, @@ -155,7 +156,7 @@ func (i *indexWriter) WriteLSMIndex(fields []index.Field) (err error) { err = multierr.Append(err, errInternal) continue } - err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))) + err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), uint64(i.ts.UnixNano()))) } return err } @@ -170,11 +171,12 @@ func (i *indexWriter) WriteInvertedIndex(fields []index.Field) (err error) { err = multierr.Append(err, errInternal) continue } - err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), uint64(i.ts.UnixNano()))) + err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), uint64(i.ts.UnixNano()))) } return err } +// GlobalSeriesID encodes Entry to common.SeriesID. func GlobalSeriesID(scope Entry) common.SeriesID { return common.SeriesID(convert.Hash(scope)) } diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go index cd99c68..40340b5 100644 --- a/banyand/tsdb/metric.go +++ b/banyand/tsdb/metric.go @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package tsdb import ( diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go index 6eb2e0f..43ff3f3 100644 --- a/banyand/tsdb/retention.go +++ b/banyand/tsdb/retention.go @@ -47,7 +47,7 @@ func newRetentionTask(segment *segmentController, ttl IntervalRule) *retentionTa segment: segment, option: cron.Minute | cron.Hour, expr: expr, - duration: ttl.EstimatedDuration(), + duration: ttl.estimatedDuration(), } } diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go index ab9711c..952fd3c 100644 --- a/banyand/tsdb/scope.go +++ b/banyand/tsdb/scope.go @@ -24,45 +24,46 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" ) -var _ Shard = (*ScopedShard)(nil) +var _ Shard = (*scopedShard)(nil) -type ScopedShard struct { +type scopedShard struct { delegated Shard scope Entry } +// NewScopedShard returns a shard in a scope. func NewScopedShard(scope Entry, delegated Shard) Shard { - return &ScopedShard{ + return &scopedShard{ scope: scope, delegated: delegated, } } -func (sd *ScopedShard) Close() error { +func (sd *scopedShard) Close() error { // the delegate can't close the underlying shard return nil } -func (sd *ScopedShard) ID() common.ShardID { +func (sd *scopedShard) ID() common.ShardID { return sd.delegated.ID() } -func (sd *ScopedShard) Series() SeriesDatabase { +func (sd *scopedShard) Series() SeriesDatabase { return &scopedSeriesDatabase{ scope: sd.scope, delegated: sd.delegated.Series(), } } -func (sd *ScopedShard) Index() IndexDatabase { +func (sd *scopedShard) Index() IndexDatabase { return sd.delegated.Index() } -func (sd *ScopedShard) TriggerSchedule(task string) bool { +func (sd *scopedShard) TriggerSchedule(task string) bool { return sd.delegated.TriggerSchedule(task) } -func (sd *ScopedShard) State() ShardState { +func (sd *scopedShard) State() ShardState { return sd.delegated.State() } @@ -90,5 +91,5 @@ func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) { } func (sdd *scopedSeriesDatabase) List(ctx context.Context, path Path) (SeriesList, error) { - return sdd.delegated.List(ctx, path.Prepend(sdd.scope)) + return sdd.delegated.List(ctx, path.prepend(sdd.scope)) } diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index 4a21a71..c6582f2 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -32,7 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -var ErrEndOfSegment = errors.New("reached the end of the segment") +var errEndOfSegment = errors.New("reached the end of the segment") type segment struct { globalIndex kv.Store diff --git a/banyand/tsdb/segment_ctrl.go b/banyand/tsdb/segment_ctrl.go index de02ef1..98653a2 100644 --- a/banyand/tsdb/segment_ctrl.go +++ b/banyand/tsdb/segment_ctrl.go @@ -134,8 +134,8 @@ func (sc *segmentController) Next() (bucket.Reporter, error) { return nil, err } seg := c.(*segment) - reporter, err := sc.create(sc.segmentSize.NextTime(seg.Start)) - if errors.Is(err, ErrEndOfSegment) { + reporter, err := sc.create(sc.segmentSize.nextTime(seg.Start)) + if errors.Is(err, errEndOfSegment) { return nil, bucket.ErrNoMoreBucket } return reporter, err @@ -187,7 +187,7 @@ func (sc *segmentController) open() error { defer sc.Unlock() return loadSections(sc.location, sc, sc.segmentSize, func(start, end time.Time) error { _, err := sc.load(start, end, sc.location) - if errors.Is(err, ErrEndOfSegment) { + if errors.Is(err, errEndOfSegment) { return nil } return err @@ -207,7 +207,7 @@ func (sc *segmentController) create(start time.Time) (*segment, error) { next = s } } - stdEnd := sc.segmentSize.NextTime(start) + stdEnd := sc.segmentSize.nextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { end = next.Start diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index 7ea6e66..a872c82 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -33,11 +33,14 @@ import ( ) var ( + // ErrEmptySeriesSpan hints there is no any data blocks based on the input time range. ErrEmptySeriesSpan = errors.New("there is no data in such time range") - ErrItemIDMalformed = errors.New("serialized item id is malformed") - ErrBlockAbsent = errors.New("block is absent") + errItemIDMalformed = errors.New("serialized item id is malformed") + errBlockAbsent = errors.New("block is absent") ) +// GlobalItemID is the top level identity of an item. +// The item could be retrieved by a GlobalItemID in a tsdb. type GlobalItemID struct { ShardID common.ShardID segID SectionID @@ -46,7 +49,7 @@ type GlobalItemID struct { ID common.ItemID } -func (i *GlobalItemID) Marshal() []byte { +func (i *GlobalItemID) marshal() []byte { return bytes.Join([][]byte{ convert.Uint32ToBytes(uint32(i.ShardID)), sectionIDToBytes(i.segID), @@ -56,9 +59,9 @@ func (i *GlobalItemID) Marshal() []byte { }, nil) } -func (i *GlobalItemID) UnMarshal(data []byte) error { +func (i *GlobalItemID) unMarshal(data []byte) error { if len(data) != 4+4+4+8+8 { - return ErrItemIDMalformed + return errItemIDMalformed } var offset int i.ShardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4])) @@ -71,6 +74,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error { return nil } +// Series denotes a series of data points group by a common.SeriesID +// common.SeriesID is encoded by a entity defined by Stream or Measure. type Series interface { ID() common.SeriesID Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) @@ -79,6 +84,7 @@ type Series interface { String() string } +// SeriesSpan is a span in a time series. It contains data blocks in such time range. type SeriesSpan interface { io.Closer WriterBuilder() WriterBuilder @@ -101,7 +107,7 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, err return nil, nil, err } if b == nil { - return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", id) + return nil, nil, errors.WithMessagef(errBlockAbsent, "id: %v", id) } return &item{ data: b.dataReader(), @@ -191,7 +197,7 @@ type seriesSpan struct { l *logger.Logger timeRange timestamp.TimeRange series string - blocks []BlockDelegate + blocks []blockDelegate seriesID common.SeriesID shardID common.ShardID } @@ -211,7 +217,7 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder { return newSeekerBuilder(s) } -func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks []BlockDelegate, id common.SeriesID, series string, shardID common.ShardID) *seriesSpan { +func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks []blockDelegate, id common.SeriesID, series string, shardID common.ShardID) *seriesSpan { s := &seriesSpan{ blocks: blocks, seriesID: id, diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go index db66d98..28147f7 100644 --- a/banyand/tsdb/series_seek.go +++ b/banyand/tsdb/series_seek.go @@ -34,12 +34,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) +// Iterator allows iterating a series in a time span. type Iterator interface { Next() bool Val() Item Close() error } +// Item allows retrieving raw data from an item. type Item interface { Family(family []byte) ([]byte, error) PrintContext(l *logger.Logger, family []byte, n int) @@ -49,6 +51,7 @@ type Item interface { Time() uint64 } +// SeekerBuilder a helper to build a Seeker. type SeekerBuilder interface { Filter(predicator index.Filter) SeekerBuilder OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder @@ -56,6 +59,7 @@ type SeekerBuilder interface { Build() (Seeker, error) } +// Seeker allows searching data in a Database. type Seeker interface { Seek() ([]Iterator, error) } diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go index 7e8cd4a..64d7ed1 100644 --- a/banyand/tsdb/series_seek_filter.go +++ b/banyand/tsdb/series_seek_filter.go @@ -24,14 +24,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" ) -var ErrUnsupportedIndexRule = errors.New("the index rule is not supported") +var errUnsupportedIndexRule = errors.New("the index rule is not supported") func (s *seekerBuilder) Filter(predicator index.Filter) SeekerBuilder { s.predicator = predicator return s } -func (s *seekerBuilder) buildIndexFilter(block BlockDelegate) (filterFn, error) { +func (s *seekerBuilder) buildIndexFilter(block blockDelegate) (filterFn, error) { if s.predicator == nil { return nil, nil } @@ -42,7 +42,7 @@ func (s *seekerBuilder) buildIndexFilter(block BlockDelegate) (filterFn, error) case databasev1.IndexRule_TYPE_TREE: return block.lsmIndexReader(), nil default: - return nil, ErrUnsupportedIndexRule + return nil, errUnsupportedIndexRule } }, s.seriesSpan.seriesID) if err != nil { diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go index d56cacf..40d05b2 100644 --- a/banyand/tsdb/series_seek_sort.go +++ b/banyand/tsdb/series_seek_sort.go @@ -36,7 +36,7 @@ import ( ) var ( - ErrUnspecifiedIndexType = errors.New("Unspecified index type") + errUnspecifiedIndexType = errors.New("Unspecified index type") emptyFilters = make([]filterFn, 0) ) @@ -89,7 +89,7 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) { case databasev1.IndexRule_TYPE_INVERTED: inner, err = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order) case databasev1.IndexRule_TYPE_UNSPECIFIED: - return nil, errors.WithMessagef(ErrUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) + return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting) } if err != nil { return nil, err diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go index b6da92c..f903566 100644 --- a/banyand/tsdb/series_write.go +++ b/banyand/tsdb/series_write.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index" ) +// WriterBuilder is a helper to build a Writer. type WriterBuilder interface { Family(name []byte, val []byte) WriterBuilder Time(ts time.Time) WriterBuilder @@ -36,6 +37,7 @@ type WriterBuilder interface { Build() (Writer, error) } +// Writer allow ingesting data into a tsdb. type Writer interface { IndexWriter Write() (GlobalItemID, error) @@ -47,7 +49,7 @@ var _ WriterBuilder = (*writerBuilder)(nil) type writerBuilder struct { series *seriesSpan - block BlockDelegate + block blockDelegate values []struct { family []byte val []byte @@ -84,26 +86,25 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder { } var ( - ErrNoTime = errors.New("no time specified") - ErrNoVal = errors.New("no value specified") + errNoTime = errors.New("no time specified") + errNoVal = errors.New("no value specified") + errDuplicatedFamily = errors.New("duplicated family") ) -var ErrDuplicatedFamily = errors.New("duplicated family") - func (w *writerBuilder) Build() (Writer, error) { if w.block == nil { - return nil, errors.WithMessagef(ErrNoTime, "ts:%v", w.ts) + return nil, errors.WithMessagef(errNoTime, "ts:%v", w.ts) } if len(w.values) < 1 { - return nil, errors.WithStack(ErrNoVal) + return nil, errors.WithStack(errNoVal) } for i, value := range w.values { for j := i + 1; j < len(w.values); j++ { if value.family == nil && w.values[j].family == nil { - return nil, errors.Wrap(ErrDuplicatedFamily, "default family") + return nil, errors.Wrap(errDuplicatedFamily, "default family") } if bytes.Equal(value.family, w.values[j].family) { - return nil, errors.Wrapf(ErrDuplicatedFamily, "family:%s", value.family) + return nil, errors.Wrapf(errDuplicatedFamily, "family:%s", value.family) } } } @@ -133,7 +134,7 @@ var _ Writer = (*writer)(nil) type writer struct { ts time.Time - block BlockDelegate + block blockDelegate itemID *GlobalItemID columns []struct { family []byte diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index aac8e55..88acc8d 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -49,12 +49,17 @@ var ( zeroIntBytes = convert.Uint64ToBytes(0) ) +// AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity. var AnyEntry = Entry(nil) +// Entry is an element in an Entity. type Entry []byte +// Entity denotes an identity of a Series. +// It defined by Stream or Measure schema. type Entity []Entry +// Marshal encodes an Entity to bytes. func (e Entity) Marshal() []byte { data := make([][]byte, len(e)) for i, entry := range e { @@ -63,18 +68,21 @@ func (e Entity) Marshal() []byte { return bytes.Join(data, nil) } +// Prepend inserts an Entry before the first Entry as the prefix. func (e Entity) Prepend(entry Entry) Entity { d := e d = append(Entity{entry}, d...) return d } +// Copy an Entity deeply. func (e Entity) Copy() Entity { a := make(Entity, len(e)) copy(a, e) return a } +// NewEntity return an Entity with an fixed length. func NewEntity(length int) Entity { e := make(Entity, length) for i := 0; i < length; i++ { @@ -83,18 +91,23 @@ func NewEntity(length int) Entity { return e } +// EntityValue represents the value of a tag which is a part of an entity. type EntityValue *modelv1.TagValue +// EntityValueToEntry transforms EntityValue to Entry. func EntityValueToEntry(ev EntityValue) (Entry, error) { return pbv1.MarshalTagValue(ev) } +// EntityValues is the encoded Entity. type EntityValues []EntityValue +// Prepend inserts an EntityValue before the first EntityValue as the prefix. func (evs EntityValues) Prepend(scope EntityValue) EntityValues { return append(EntityValues{scope}, evs...) } +// Encode EntityValues to tag values. func (evs EntityValues) Encode() (result []*modelv1.TagValue) { for _, v := range evs { result = append(result, v) @@ -102,6 +115,7 @@ func (evs EntityValues) Encode() (result []*modelv1.TagValue) { return } +// ToEntity transforms EntityValues to Entity. func (evs EntityValues) ToEntity() (result Entity, err error) { for _, v := range evs { entry, errMarshal := EntityValueToEntry(v) @@ -113,6 +127,7 @@ func (evs EntityValues) ToEntity() (result Entity, err error) { return } +// String outputs the string represent of an EntityValue. func (evs EntityValues) String() string { var strBuilder strings.Builder vv := evs.Encode() @@ -125,6 +140,7 @@ func (evs EntityValues) String() string { return strBuilder.String() } +// DecodeEntityValues decodes tag values to EntityValues. func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues) { for _, tv := range tvv { result = append(result, tv) @@ -132,14 +148,17 @@ func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues) { return } +// StrValue returns an EntityValue which wraps a string value. func StrValue(v string) EntityValue { return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: v}}} } +// Int64Value returns an EntityValue which wraps a int64 value. func Int64Value(v int64) EntityValue { return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: v}}} } +// MarshalEntityValues encodes EntityValues to bytes. func MarshalEntityValues(evs EntityValues) ([]byte, error) { data := &modelv1.TagFamilyForWrite{} for _, v := range evs { @@ -148,6 +167,7 @@ func MarshalEntityValues(evs EntityValues) ([]byte, error) { return proto.Marshal(data) } +// UnmarshalEntityValues decodes EntityValues from bytes. func UnmarshalEntityValues(evs []byte) (result EntityValues, err error) { data := &modelv1.TagFamilyForWrite{} result = make(EntityValues, len(data.Tags)) @@ -160,6 +180,8 @@ func UnmarshalEntityValues(evs []byte) (result EntityValues, err error) { return } +// Path denotes a expression to match a Series. +// It supports the fuzzy matching more than EQ by setting an entry to AnyEntry. type Path struct { prefix []byte seekKey []byte @@ -169,7 +191,8 @@ type Path struct { offset int } -func NewPath(entries []Entry) Path { +// NewPath return a Path with a matching expression. +func NewPath(matchingExpression []Entry) Path { p := Path{ seekKey: make([]byte, 0), mask: make([]byte, 0), @@ -177,7 +200,7 @@ func NewPath(entries []Entry) Path { } var encounterAny bool - for _, e := range entries { + for _, e := range matchingExpression { if e == nil { encounterAny = true p.mask = append(p.mask, zeroIntBytes...) @@ -207,7 +230,7 @@ func (p *Path) extractPrefix() { } } -func (p Path) Prepend(entry Entry) Path { +func (p Path) prepend(entry Entry) Path { e := Hash(entry) p.template = prepend(p.template, e) p.offset += len(e) @@ -223,6 +246,7 @@ func prepend(src []byte, entry []byte) []byte { return dst } +// SeriesDatabase allows retrieving series. type SeriesDatabase interface { observability.Observable io.Closer @@ -233,9 +257,9 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID - span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) - create(ctx context.Context, ts time.Time) (BlockDelegate, error) - block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) + span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) + create(ctx context.Context, ts time.Time) (blockDelegate, error) + block(ctx context.Context, id GlobalItemID) (blockDelegate, error) } var ( @@ -272,7 +296,7 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { return newSeries(s.context(), id, series, s), nil } -func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) { +func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (blockDelegate, error) { seg := s.segCtrl.get(id.segID) if seg == nil { return nil, nil @@ -410,8 +434,8 @@ func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) { return result, err } -func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) { - result := make([]BlockDelegate, 0) +func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]blockDelegate, error) { + result := make([]blockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { dd, err := s.blockController.span(ctx, timeRange) if err != nil { @@ -425,7 +449,7 @@ func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]B return result, nil } -func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, error) { +func (s *seriesDB) create(ctx context.Context, ts time.Time) (blockDelegate, error) { s.Lock() defer s.Unlock() timeRange := timestamp.NewInclusiveTimeRange(ts, ts) @@ -508,10 +532,12 @@ func HashEntity(entity Entity) []byte { return result } +// SeriesID transforms Entity to common.SeriesID. func SeriesID(entity Entity) common.SeriesID { return common.SeriesID(convert.Hash(HashEntity(entity))) } +// Hash encode Entry to 8 bytes. func Hash(entry []byte) []byte { return convert.Uint64ToBytes(convert.Hash(entry)) } @@ -520,6 +546,7 @@ func bytesToSeriesID(data []byte) common.SeriesID { return common.SeriesID(convert.BytesToUint64(data)) } +// SeriesList is a collection of Series. type SeriesList []Series func (a SeriesList) Len() int { @@ -534,6 +561,7 @@ func (a SeriesList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +// Merge other SeriesList with this one to create a new SeriesList. func (a SeriesList) Merge(other SeriesList) SeriesList { if len(other) == 0 { return a diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go index 4d232f5..129f320 100644 --- a/banyand/tsdb/seriesdb_test.go +++ b/banyand/tsdb/seriesdb_test.go @@ -342,7 +342,7 @@ func TestNewPath(t *testing.T) { t.Run(tt.name, func(t *testing.T) { got := NewPath(tt.entity) if tt.scope != nil { - got = got.Prepend(tt.scope) + got = got.prepend(tt.scope) } tester.Equal(tt.want, got) }) diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index f89b2d8..07b89a7 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -54,6 +54,7 @@ type shard struct { id common.ShardID } +// OpenShard returns an existed Shard or create a new one if not existed. func OpenShard(ctx context.Context, id common.ShardID, root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int, ) (Shard, error) { @@ -178,8 +179,10 @@ func (s *shard) Close() (err error) { return err } +// IntervalUnit denotes the unit of a time point. type IntervalUnit int +// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario. const ( HOUR IntervalUnit = iota DAY @@ -195,12 +198,13 @@ func (iu IntervalUnit) String() string { panic("invalid interval unit") } +// IntervalRule defines a length of two points in time. type IntervalRule struct { Unit IntervalUnit Num int } -func (ir IntervalRule) NextTime(current time.Time) time.Time { +func (ir IntervalRule) nextTime(current time.Time) time.Time { switch ir.Unit { case HOUR: return current.Add(time.Hour * time.Duration(ir.Num)) @@ -210,17 +214,7 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time { panic("invalid interval unit") } -func (ir IntervalRule) PreviousTime(current time.Time) time.Time { - switch ir.Unit { - case HOUR: - return current.Add(-time.Hour * time.Duration(ir.Num)) - case DAY: - return current.AddDate(0, 0, -ir.Num) - } - panic("invalid interval unit") -} - -func (ir IntervalRule) EstimatedDuration() time.Duration { +func (ir IntervalRule) estimatedDuration() time.Duration { switch ir.Unit { case HOUR: return time.Hour * time.Duration(ir.Num) @@ -236,7 +230,7 @@ type parser interface { func loadSections(root string, parser parser, intervalRule IntervalRule, loadFn func(start, end time.Time) error) error { var startTimeLst []time.Time - if err := WalkDir( + if err := walkDir( root, segPathPrefix, func(suffix string) error { @@ -255,7 +249,7 @@ func loadSections(root string, parser parser, intervalRule IntervalRule, loadFn if i < len(startTimeLst)-1 { end = startTimeLst[i+1] } else { - end = intervalRule.NextTime(start) + end = intervalRule.nextTime(start) } if err := loadFn(start, end); err != nil { return err diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 70546db..8f6ad3a 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -15,6 +15,12 @@ // specific language governing permissions and limitations // under the License. +// Package tsdb implements a time-series-based storage engine. +// It provides: +// - Partition data based on a time axis. +// - Sharding data based on a series id which represents a unique entity of stream/measure +// - Retrieving data based on index.Filter. +// - Cleaning expired data, or the data retention. package tsdb import ( @@ -55,23 +61,27 @@ const ( ) var ( - ErrInvalidShardID = errors.New("invalid shard id") - ErrOpenDatabase = errors.New("fails to open the database") + errInvalidShardID = errors.New("invalid shard id") + errOpenDatabase = errors.New("fails to open the database") optionsKey = contextOptionsKey{} ) type contextOptionsKey struct{} +// Supplier allows getting a tsdb's runtime. type Supplier interface { SupplyTSDB() Database } + +// Database allows listing and getting shard details. type Database interface { io.Closer Shards() []Shard Shard(id common.ShardID) (Shard, error) } +// Shard allows accessing data of tsdb. type Shard interface { io.Closer ID() common.ShardID @@ -84,6 +94,7 @@ type Shard interface { var _ Database = (*database)(nil) +// DatabaseOpts wraps options to create a tsdb. type DatabaseOpts struct { EncodingMethod EncodingMethod Location string @@ -97,14 +108,18 @@ type DatabaseOpts struct { EnableGlobalIndex bool } +// EncodingMethod wraps encoder/decoder pools to flush/compact data on disk. type EncodingMethod struct { EncoderPool encoding.SeriesEncoderPool DecoderPool encoding.SeriesDecoderPool } type ( + // SectionID is the kind of a block/segment. SectionID uint32 - BlockID struct { + + // BlockID is the identity of a block in a shard. + BlockID struct { SegID SectionID BlockID SectionID } @@ -114,6 +129,7 @@ func (b BlockID) String() string { return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), parseSuffix(b.BlockID)) } +// GenerateInternalID returns a identity of a section(segment or block) based on IntervalRule. func GenerateInternalID(unit IntervalUnit, suffix int) SectionID { return SectionID(unit)<<31 | ((SectionID(suffix) << 1) >> 1) } @@ -131,11 +147,14 @@ func readSectionID(data []byte, offset int) (SectionID, int) { return SectionID(convert.BytesToUint32(data[offset:end])), end } +// BlockState is a sample of a block's runtime state. type BlockState struct { TimeRange timestamp.TimeRange ID BlockID Closed bool } + +// ShardState is a sample of a shard's runtime state. type ShardState struct { Blocks []BlockState OpenBlocks []BlockID @@ -159,7 +178,7 @@ func (d *database) Shards() []Shard { func (d *database) Shard(id common.ShardID) (Shard, error) { if uint(id) >= uint(len(d.sLst)) { - return nil, ErrInvalidShardID + return nil, errInvalidShardID } return d.sLst[id], nil } @@ -175,24 +194,26 @@ func (d *database) Close() error { return err } +// OpenDatabase returns a new tsdb runtime. This constructor will create a new database if it's absent, +// or load an existing one. func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil { - return nil, errors.Wrap(ErrOpenDatabase, "encoding method is absent") + return nil, errors.Wrap(errOpenDatabase, "encoding method is absent") } if _, err := mkdir(opts.Location); err != nil { return nil, err } if opts.SegmentInterval.Num == 0 { - return nil, errors.Wrap(ErrOpenDatabase, "segment interval is absent") + return nil, errors.Wrap(errOpenDatabase, "segment interval is absent") } if opts.BlockInterval.Num == 0 { - return nil, errors.Wrap(ErrOpenDatabase, "block interval is absent") + return nil, errors.Wrap(errOpenDatabase, "block interval is absent") } - if opts.BlockInterval.EstimatedDuration() > opts.SegmentInterval.EstimatedDuration() { - return nil, errors.Wrapf(ErrOpenDatabase, "the block size is bigger than the segment size") + if opts.BlockInterval.estimatedDuration() > opts.SegmentInterval.estimatedDuration() { + return nil, errors.Wrapf(errOpenDatabase, "the block size is bigger than the segment size") } if opts.TTL.Num == 0 { - return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent") + return nil, errors.Wrap(errOpenDatabase, "ttl is absent") } db := &database{ location: opts.Location, @@ -242,7 +263,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { // TODO: open the manifest file db.Lock() defer db.Unlock() - err := WalkDir(db.location, shardPathPrefix, func(suffix string) error { + err := walkDir(db.location, shardPathPrefix, func(suffix string) error { shardID, err := strconv.Atoi(suffix) if err != nil { return err @@ -281,9 +302,9 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { return db, nil } -type WalkFn func(suffix string) error +type walkFn func(suffix string) error -func WalkDir(root, prefix string, walkFn WalkFn) error { +func walkDir(root, prefix string, wf walkFn) error { files, err := os.ReadDir(root) if err != nil { return errors.Wrapf(err, "failed to walk the database path: %s", root) @@ -293,7 +314,7 @@ func WalkDir(root, prefix string, walkFn WalkFn) error { continue } segs := strings.Split(f.Name(), "-") - errWalk := walkFn(segs[len(segs)-1]) + errWalk := wf(segs[len(segs)-1]) if errWalk != nil { return errors.WithMessagef(errWalk, "failed to load: %s", f.Name()) } diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index 1b2f979..daa2859 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -88,7 +88,7 @@ type Resource interface { type ResourceSupplier interface { OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) (Resource, error) - ResourceSchema(repo metadata.Repo, metdata *commonv1.Metadata) (ResourceSchema, error) + ResourceSchema(metdata *commonv1.Metadata) (ResourceSchema, error) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) } @@ -287,7 +287,7 @@ func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, erro return nil, errors.WithMessagef(err, "create unknown group:%s", metadata.Group) } } - stm, err := sr.resourceSupplier.ResourceSchema(sr.metadata, metadata) + stm, err := sr.resourceSupplier.ResourceSchema(metadata) if err != nil { return nil, errors.WithMessage(err, "fails to get the resource") } diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 4235604..e8b3063 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -82,7 +82,7 @@ func modules(flags []string) func() { measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) // Init `Query` module - q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc) httpServer := http.NewService()
