This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b61ab29  Bucket strategy managment and segment controller (#70)
b61ab29 is described below

commit b61ab295c38c672ca7aa60e308d0d64037272766
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jan 24 12:16:18 2022 +0800

    Bucket strategy managment and segment controller (#70)
    
    * init bucket
    Add segment controller
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Update banyand/tsdb/bucket/strategy.go
    
    Co-authored-by: Jiajing LU <[email protected]>
    
    * Rename variable
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    * Remove outter loop from strategy
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    Co-authored-by: Jiajing LU <[email protected]>
---
 banyand/measure/query_test.go                      |   2 +-
 banyand/measure/write.go                           |   2 +-
 banyand/query/processor_test.go                    |   9 +-
 banyand/stream/stream_query.go                     |   2 +-
 banyand/stream/stream_query_test.go                |  21 +-
 banyand/stream/stream_write.go                     |   2 +-
 banyand/tsdb/block.go                              |  14 +-
 .../logger.go => banyand/tsdb/bucket/bucket.go     |  39 +--
 .../tsdb/bucket/bucket_suite_test.go               |  47 ++--
 banyand/tsdb/bucket/strategy.go                    | 127 ++++++++++
 banyand/tsdb/bucket/strategy_test.go               | 141 +++++++++++
 banyand/tsdb/indexdb.go                            |  45 ++--
 banyand/tsdb/segment.go                            |  74 ++++--
 banyand/tsdb/series.go                             |  59 ++++-
 banyand/tsdb/series_seek_sort.go                   |   2 +-
 banyand/tsdb/series_test.go                        | 132 ++++++++++
 banyand/tsdb/seriesdb.go                           |  35 ++-
 banyand/tsdb/shard.go                              | 266 +++++++++++++++++----
 banyand/tsdb/shard_test.go                         |  70 ++++++
 banyand/tsdb/tsdb.go                               |  55 +++--
 .../logger.go => banyand/tsdb/tsdb_suite_test.go   |  46 ++--
 banyand/tsdb/tsdb_test.go                          |   2 +-
 pkg/logger/logger.go                               |  12 +
 pkg/query/logical/common.go                        |   4 +-
 pkg/query/logical/plan_indexscan_local.go          |   2 +-
 25 files changed, 944 insertions(+), 266 deletions(-)

diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 31638db..60b8bf2 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -37,7 +37,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) {
        r.NoError(err)
        series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
        r.NoError(err)
-       seriesSpan, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour))
+       seriesSpan, err := 
series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
        defer func(seriesSpan tsdb.SeriesSpan) {
                _ = seriesSpan.Close()
        }(seriesSpan)
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b1dc724..b0f9854 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -78,7 +78,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey 
[]byte, value *mea
                return err
        }
        t := value.GetTimestamp().AsTime()
-       wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+       wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index dae4951..d3a659a 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -377,8 +377,13 @@ func TestQueryProcessor(t *testing.T) {
                        singleTester.NoError(err)
                        singleTester.NotNil(msg)
                        // TODO: better error response
-                       singleTester.NotNil(msg.Data())
-                       singleTester.Len(msg.Data(), tt.wantLen)
+                       var dataLen int
+                       if msg.Data() == nil {
+                               dataLen = 0
+                       } else {
+                               dataLen = len(msg.Data().([]*streamv1.Element))
+                       }
+                       singleTester.Equal(dataLen, tt.wantLen)
                        if tt.checker != nil {
                                
singleTester.True(tt.checker(msg.Data().([]*streamv1.Element)))
                        }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index dfca4f4..6913b4a 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -20,8 +20,8 @@ package stream
 import (
        "io"
 
-       "github.com/golang/protobuf/proto"
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
diff --git a/banyand/stream/stream_query_test.go 
b/banyand/stream/stream_query_test.go
index f77cfd6..2533ef4 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -20,7 +20,6 @@ package stream
 import (
        "bytes"
        "embed"
-       _ "embed"
        "encoding/base64"
        "encoding/json"
        "fmt"
@@ -107,7 +106,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "all",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -137,7 +136,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "time range",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 
1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -164,7 +163,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "find series by service_id and instance_id",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -183,7 +182,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "find a series",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -197,7 +196,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        builder.Filter(&databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -243,7 +242,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "order by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        
builder.OrderByIndex(&databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -284,7 +283,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -331,7 +330,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter and sort by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -379,7 +378,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by several conditions",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -442,7 +441,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by several conditions, sort by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: tsdb.NewTimeRangeDuration(baseTime, 
1*time.Hour),
+                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index d35d088..7fef502 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -71,7 +71,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey 
[]byte, value *stre
                return err
        }
        t := value.GetTimestamp().AsTime()
-       wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
+       wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 956bf13..c5a2b98 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -23,11 +23,11 @@ import (
        "time"
 
        "github.com/dgraph-io/ristretto/z"
-       "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/kv"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/lsm"
@@ -63,16 +63,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
                path:      opts.path,
                ref:       z.NewCloser(1),
                startTime: time.Now(),
-       }
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger != nil {
-               if pl, ok := parentLogger.(*logger.Logger); ok {
-                       b.l = pl.Named("block")
-               }
+               l:         logger.Fetch(ctx, "block"),
        }
        encodingMethodObject := ctx.Value(encodingMethodKey)
        if encodingMethodObject == nil {
-               return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
create a block")
+               encodingMethodObject = EncodingMethod{
+                       EncoderPool: encoding.NewPlainEncoderPool(0),
+                       DecoderPool: encoding.NewPlainDecoderPool(0),
+               }
        }
        encodingMethod := encodingMethodObject.(EncodingMethod)
        if b.store, err = kv.OpenTimeSeriesStore(
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket.go
index 36896ed..82a76ec 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -15,39 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package logger
+package bucket
 
-import (
-       "strings"
-
-       "github.com/pkg/errors"
-       "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
-
-type contextKey struct{}
-
-// Logging is the config info
-type Logging struct {
-       Env   string
-       Level string
+type Controller interface {
+       Current() Reporter
+       Next() (Reporter, error)
 }
 
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-       module string
-       *zerolog.Logger
+type Status struct {
+       Capacity int
+       Volume   int
 }
 
-func (l *Logger) Named(name string) *Logger {
-       module := strings.Join([]string{l.module, name}, ".")
-       subLogger := root.Logger.With().Str("module", module).Logger()
-       return &Logger{module: module, Logger: &subLogger}
-}
+type Channel chan Status
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-       SetLogger(*Logger)
+type Reporter interface {
+       Report() Channel
 }
diff --git a/pkg/logger/logger.go b/banyand/tsdb/bucket/bucket_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/bucket/bucket_suite_test.go
index 36896ed..0ab7ff8 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -14,40 +14,25 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package logger
+//
+package bucket_test
 
 import (
-       "strings"
-
-       "github.com/pkg/errors"
-       "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+       "testing"
 
-type contextKey struct{}
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
 
-// Logging is the config info
-type Logging struct {
-       Env   string
-       Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-       module string
-       *zerolog.Logger
-}
-
-func (l *Logger) Named(name string) *Logger {
-       module := strings.Join([]string{l.module, name}, ".")
-       subLogger := root.Logger.With().Str("module", module).Logger()
-       return &Logger{module: module, Logger: &subLogger}
-}
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-       SetLogger(*Logger)
+func TestBucket(t *testing.T) {
+       RegisterFailHandler(Fail)
+       BeforeSuite(func() {
+               Expect(logger.Init(logger.Logging{
+                       Env:   "dev",
+                       Level: "debug",
+               })).Should(Succeed())
+       })
+       RunSpecs(t, "Bucket Suite")
 }
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
new file mode 100644
index 0000000..339aa4d
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy.go
@@ -0,0 +1,127 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bucket
+
+import (
+       "github.com/pkg/errors"
+       "go.uber.org/multierr"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var (
+       ErrInvalidParameter = errors.New("parameters are invalid")
+       ErrNoMoreBucket     = errors.New("no more buckets")
+)
+
+type Ratio float64
+
+type Strategy struct {
+       optionsErr error
+       ratio      Ratio
+       ctrl       Controller
+       current    Reporter
+       next       Reporter
+       logger     *logger.Logger
+       stopCh     chan struct{}
+}
+
+type StrategyOptions func(*Strategy)
+
+func WithNextThreshold(ratio Ratio) StrategyOptions {
+       return func(s *Strategy) {
+               if ratio > 1.0 {
+                       s.optionsErr = multierr.Append(s.optionsErr,
+                               errors.Wrapf(ErrInvalidParameter, "ratio %v is 
more than 1.0", ratio))
+                       return
+               }
+               s.ratio = ratio
+       }
+}
+
+func WithLogger(logger *logger.Logger) StrategyOptions {
+       return func(s *Strategy) {
+               s.logger = logger
+       }
+}
+
+func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, 
error) {
+       if ctrl == nil {
+               return nil, errors.Wrap(ErrInvalidParameter, "controller is 
absent")
+       }
+       strategy := &Strategy{
+               ctrl:   ctrl,
+               ratio:  0.8,
+               stopCh: make(chan struct{}),
+       }
+       for _, opt := range options {
+               opt(strategy)
+       }
+       if strategy.optionsErr != nil {
+               return nil, strategy.optionsErr
+       }
+       if strategy.logger == nil {
+               strategy.logger = logger.GetLogger("bucket-strategy")
+       }
+       return strategy, nil
+}
+
+func (s *Strategy) Run() {
+       reset := func() {
+               for s.current == nil {
+                       s.current = s.ctrl.Current()
+               }
+               s.next = nil
+       }
+       reset()
+       go func(s *Strategy) {
+               var err error
+       bucket:
+               c := s.current.Report()
+               for {
+                       select {
+                       case status, closed := <-c:
+                               if !closed {
+                                       reset()
+                                       goto bucket
+                               }
+                               ratio := Ratio(status.Volume) / 
Ratio(status.Capacity)
+                               if ratio >= s.ratio && s.next == nil {
+                                       s.next, err = s.ctrl.Next()
+                                       if errors.Is(err, ErrNoMoreBucket) {
+                                               return
+                                       }
+                                       if err != nil {
+                                               s.logger.Err(err).Msg("failed 
to create the next bucket")
+                                       }
+                               }
+                               if ratio >= 1.0 {
+                                       s.current = s.next
+                                       s.next = nil
+                                       goto bucket
+                               }
+                       case <-s.stopCh:
+                               return
+                       }
+               }
+       }(s)
+}
+
+func (s *Strategy) Close() {
+       close(s.stopCh)
+}
diff --git a/banyand/tsdb/bucket/strategy_test.go 
b/banyand/tsdb/bucket/strategy_test.go
new file mode 100644
index 0000000..3a302ce
--- /dev/null
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket_test
+
+import (
+       "sync"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+)
+
+var _ = Describe("Strategy", func() {
+       Context("Applying the strategy", func() {
+               var strategy *bucket.Strategy
+               It("uses the golden settings", func() {
+                       ctrl := newController(2, 1, 10)
+                       var err error
+                       strategy, err = bucket.NewStrategy(ctrl)
+                       Expect(err).NotTo(HaveOccurred())
+                       strategy.Run()
+                       Eventually(ctrl.isFull).Should(BeTrue())
+               })
+               It("never reaches the limit", func() {
+                       ctrl := newController(1, 0, 10)
+                       var err error
+                       strategy, err = bucket.NewStrategy(ctrl)
+                       Expect(err).NotTo(HaveOccurred())
+                       strategy.Run()
+                       Consistently(ctrl.isFull).ShouldNot(BeTrue())
+               })
+               It("exceeds the limit", func() {
+                       ctrl := newController(2, 3, 10)
+                       var err error
+                       strategy, err = bucket.NewStrategy(ctrl)
+                       Expect(err).NotTo(HaveOccurred())
+                       strategy.Run()
+                       Eventually(ctrl.isFull).Should(BeTrue())
+               })
+               It("'s first step exceeds the limit", func() {
+                       ctrl := newController(2, 15, 10)
+                       var err error
+                       strategy, err = bucket.NewStrategy(ctrl)
+                       Expect(err).NotTo(HaveOccurred())
+                       strategy.Run()
+                       Eventually(ctrl.isFull).Should(BeTrue())
+               })
+               AfterEach(func() {
+                       if strategy != nil {
+                               strategy.Close()
+                       }
+               })
+       })
+       Context("Invalid parameter", func() {
+               It("passes a ratio > 1.0", func() {
+                       ctrl := newController(2, 3, 10)
+                       _, err := bucket.NewStrategy(ctrl, 
bucket.WithNextThreshold(1.1))
+                       Expect(err).To(MatchError(bucket.ErrInvalidParameter))
+               })
+       })
+})
+
+type controller struct {
+       maxBuckets  int
+       usedBuckets int
+
+       reporter *reporter
+       capacity int
+       step     int
+       mux      sync.RWMutex
+}
+
+func newController(maxBuckets, step, capacity int) *controller {
+       ctrl := &controller{step: step, maxBuckets: maxBuckets, capacity: 
capacity}
+       ctrl.newReporter()
+       return ctrl
+}
+
+func (c *controller) Next() (bucket.Reporter, error) {
+       c.mux.Lock()
+       defer c.mux.Unlock()
+       if c.usedBuckets >= c.maxBuckets {
+               return nil, bucket.ErrNoMoreBucket
+       }
+       c.usedBuckets++
+       c.newReporter()
+       return c.reporter, nil
+}
+
+func (c *controller) Current() bucket.Reporter {
+       c.mux.RLock()
+       defer c.mux.RUnlock()
+       return c.reporter
+}
+
+func (c *controller) newReporter() {
+       c.reporter = &reporter{step: c.step, capacity: c.capacity}
+}
+
+func (c *controller) isFull() bool {
+       c.mux.RLock()
+       defer c.mux.RUnlock()
+       return c.usedBuckets >= c.maxBuckets
+}
+
+type reporter struct {
+       capacity int
+       volume   int
+       step     int
+}
+
+func (r *reporter) Report() bucket.Channel {
+       ch := make(bucket.Channel)
+       go func() {
+               for i := 0; i < r.capacity; i++ {
+                       r.volume += r.step
+                       ch <- bucket.Status{
+                               Capacity: r.capacity,
+                               Volume:   r.volume,
+                       }
+               }
+               close(ch)
+       }()
+       return ch
+}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index dcd6123..741ba85 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -51,7 +51,7 @@ var _ IndexDatabase = (*indexDB)(nil)
 
 type indexDB struct {
        shardID common.ShardID
-       lst     []*segment
+       segCtrl *segmentController
 }
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
@@ -60,36 +60,38 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, 
error) {
        if err != nil {
                return nil, err
        }
-       err = i.lst[0].globalIndex.GetAll(f, func(rawBytes []byte) error {
-               id := &GlobalItemID{}
-               errUnMarshal := id.UnMarshal(rawBytes)
-               if errUnMarshal != nil {
-                       return errUnMarshal
+       for _, s := range i.segCtrl.segments() {
+               err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
+                       id := &GlobalItemID{}
+                       errUnMarshal := id.UnMarshal(rawBytes)
+                       if errUnMarshal != nil {
+                               return errUnMarshal
+                       }
+                       result = append(result, *id)
+                       return nil
+               })
+               if err == kv.ErrKeyNotFound {
+                       return result, nil
                }
-               result = append(result, *id)
-               return nil
-       })
-       if err == kv.ErrKeyNotFound {
-               return result, nil
        }
        return result, err
 }
 
 func (i *indexDB) WriterBuilder() IndexWriterBuilder {
-       return newIndexWriterBuilder(i.lst)
+       return newIndexWriterBuilder(i.segCtrl)
 }
 
-func newIndexDatabase(_ context.Context, id common.ShardID, lst []*segment) 
(IndexDatabase, error) {
+func newIndexDatabase(_ context.Context, id common.ShardID, segCtrl 
*segmentController) (IndexDatabase, error) {
        return &indexDB{
                shardID: id,
-               lst:     lst,
+               segCtrl: segCtrl,
        }, nil
 }
 
 var _ IndexWriterBuilder = (*indexWriterBuilder)(nil)
 
 type indexWriterBuilder struct {
-       segments     []*segment
+       segCtrl      *segmentController
        ts           time.Time
        seg          *segment
        globalItemID *GlobalItemID
@@ -97,12 +99,11 @@ type indexWriterBuilder struct {
 
 func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
        i.ts = ts
-       for _, s := range i.segments {
-               if s.contains(ts) {
-                       i.seg = s
-                       break
-               }
+       segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false))
+       if len(segs) != 1 {
+               return i
        }
+       i.seg = segs[0]
        return i
 }
 
@@ -125,9 +126,9 @@ func (i *indexWriterBuilder) Build() (IndexWriter, error) {
        }, nil
 }
 
-func newIndexWriterBuilder(segments []*segment) IndexWriterBuilder {
+func newIndexWriterBuilder(segCtrl *segmentController) IndexWriterBuilder {
        return &indexWriterBuilder{
-               segments: segments,
+               segCtrl: segCtrl,
        }
 }
 
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 6a8305e..e82767a 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -19,42 +19,74 @@ package tsdb
 
 import (
        "context"
+       "strconv"
        "sync"
        "time"
 
        "github.com/apache/skywalking-banyandb/banyand/kv"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 type segment struct {
-       path string
+       id     uint16
+       path   string
+       suffix string
 
        lst         []*block
        globalIndex kv.Store
        sync.Mutex
-       l         *logger.Logger
-       startTime time.Time
-       endTime   time.Time
+       l              *logger.Logger
+       reporterStopCh chan struct{}
+       TimeRange
 }
 
-func (s *segment) contains(ts time.Time) bool {
-       greaterAndEqualStart := s.startTime.Equal(ts) || s.startTime.Before(ts)
-       if s.endTime.IsZero() {
-               return greaterAndEqualStart
+func (s *segment) Report() bucket.Channel {
+       ch := make(bucket.Channel)
+       interval := s.Duration() >> 4
+       if interval < 100*time.Millisecond {
+               interval = 100 * time.Millisecond
        }
-       return greaterAndEqualStart && s.endTime.After(ts)
+       go func() {
+               defer close(ch)
+               ticker := time.NewTicker(interval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-ticker.C:
+                               status := bucket.Status{
+                                       Capacity: int(s.End.UnixNano() - 
s.Start.UnixNano()),
+                                       Volume:   int(time.Now().UnixNano() - 
s.Start.UnixNano()),
+                               }
+                               ch <- status
+                               if status.Volume >= status.Capacity {
+                                       return
+                               }
+                       case <-s.reporterStopCh:
+                               return
+                       }
+               }
+       }()
+       return ch
 }
 
-func newSegment(ctx context.Context, path string) (s *segment, err error) {
-       s = &segment{
-               path:      path,
-               startTime: time.Now(),
+func openSegment(ctx context.Context, suffix, path string, intervalRule 
IntervalRule) (s *segment, err error) {
+       startTime, err := intervalRule.Unit.Parse(suffix)
+       if err != nil {
+               return nil, err
        }
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger != nil {
-               if pl, ok := parentLogger.(*logger.Logger); ok {
-                       s.l = pl.Named("segment")
-               }
+       suffixInteger, err := strconv.Atoi(suffix)
+       if err != nil {
+               return nil, err
+       }
+       id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 
4)
+       s = &segment{
+               id:             id,
+               path:           path,
+               suffix:         suffix,
+               l:              logger.Fetch(ctx, "segment"),
+               reporterStopCh: make(chan struct{}),
+               TimeRange:      NewTimeRange(startTime, 
intervalRule.NextTime(startTime), true, false),
        }
        indexPath, err := mkdir(globalIndexTemplate, path)
        if err != nil {
@@ -66,7 +98,8 @@ func newSegment(ctx context.Context, path string) (s 
*segment, err error) {
        loadBlock := func(path string) error {
                var b *block
                if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, 
s.l), blockOpts{
-                       path: path,
+                       segID: s.id,
+                       path:  path,
                }); err != nil {
                        return err
                }
@@ -77,7 +110,7 @@ func newSegment(ctx context.Context, path string) (s 
*segment, err error) {
                }
                return nil
        }
-       err = walkDir(path, blockPathPrefix, func(name, absolutePath string) 
error {
+       err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error 
{
                return loadBlock(absolutePath)
        })
        if err != nil {
@@ -103,4 +136,5 @@ func (s *segment) close() {
                b.close()
        }
        s.globalIndex.Close()
+       close(s.reporterStopCh)
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c9ec07b..5cc9805 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -34,6 +34,7 @@ import (
 var (
        ErrEmptySeriesSpan = errors.New("there is no data in such time range")
        ErrItemIDMalformed = errors.New("serialized item id is malformed")
+       ErrBlockAbsent     = errors.New("block is absent")
 )
 
 type GlobalItemID struct {
@@ -72,32 +73,63 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 }
 
 type TimeRange struct {
-       Start time.Time
-       End   time.Time
+       Start        time.Time
+       End          time.Time
+       IncludeStart bool
+       IncludeEnd   bool
 }
 
-func (t TimeRange) contains(unixNano uint64) bool {
+func (t TimeRange) Contains(unixNano uint64) bool {
        tp := time.Unix(0, int64(unixNano))
-       if tp.Equal(t.End) || tp.After(t.End) {
-               return false
+       if t.Start.Equal(tp) {
+               return t.IncludeStart
        }
-       return tp.Equal(t.Start) || tp.After(t.Start)
+       if t.End.Equal(tp) {
+               return t.IncludeEnd
+       }
+       return !tp.Before(t.Start) && !tp.After(t.End)
+}
+
+func (t TimeRange) Overlapping(other TimeRange) bool {
+       if t.Start.Equal(other.End) {
+               return t.IncludeStart && other.IncludeEnd
+       }
+       if other.Start.Equal(t.End) {
+               return t.IncludeEnd && other.IncludeStart
+       }
+       return !t.Start.After(other.End) && !other.Start.After(t.End)
+}
+
+func (t TimeRange) Duration() time.Duration {
+       return t.End.Sub(t.Start)
 }
 
-func NewTimeRange(Start, End time.Time) TimeRange {
+func NewInclusiveTimeRange(start, end time.Time) TimeRange {
        return TimeRange{
-               Start: Start,
-               End:   End,
+               Start:        start,
+               End:          end,
+               IncludeStart: true,
+               IncludeEnd:   true,
        }
 }
 
-func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) 
TimeRange {
+       return NewTimeRangeDuration(start, duration, true, true)
+}
+
+func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) 
TimeRange {
        return TimeRange{
-               Start: Start,
-               End:   Start.Add(Duration),
+               Start:        start,
+               End:          end,
+               IncludeStart: includeStart,
+               IncludeEnd:   includeEnd,
        }
 }
 
+func NewTimeRangeDuration(start time.Time, duration time.Duration, 
includeStart, includeEnd bool) TimeRange {
+       return NewTimeRange(start, start.Add(duration), includeStart, 
includeEnd)
+}
+
 type Series interface {
        ID() common.SeriesID
        Span(timeRange TimeRange) (SeriesSpan, error)
@@ -121,6 +153,9 @@ type series struct {
 
 func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
        b := s.blockDB.block(id)
+       if b == nil {
+               return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", 
id)
+       }
        return &item{
                data:     b.dataReader(),
                itemID:   id.ID,
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 6182d96..38b733b 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -56,7 +56,7 @@ func (s *seekerBuilder) buildSeries(conditions []condWithIRT) 
([]Iterator, error
 
 func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series 
[]Iterator, err error) {
        timeFilter := func(item Item) bool {
-               valid := s.seriesSpan.timeRange.contains(item.Time())
+               valid := s.seriesSpan.timeRange.Contains(item.Time())
                timeRange := s.seriesSpan.timeRange
                s.seriesSpan.l.Trace().
                        Times("time_range", []time.Time{timeRange.Start, 
timeRange.End}).
diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go
new file mode 100644
index 0000000..25a7df6
--- /dev/null
+++ b/banyand/tsdb/series_test.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+       "fmt"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+)
+
+var _ = Describe("Series", func() {
+       Context("TimeRange", func() {
+               Context("Contains", func() {
+                       verifyFn := func(start, end string, includeStart, 
includeEnd bool, ts string, expected bool) {
+                               startTime, _ := time.Parse("20060202", start)
+                               endTime, _ := time.Parse("20060202", end)
+                               tsTime, _ := time.Parse("20060202", ts)
+                               Expect(tsdb.NewTimeRange(startTime, endTime, 
includeStart, 
includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
+                       }
+                       DescribeTable("It's a exclusive range",
+                               func(start, end, ts string, expected bool) {
+                                       verifyFn(start, end, false, false, ts, 
expected)
+                               },
+                               Entry("is in the middle", "20220205", 
"20220107", "20220106", true),
+                               Entry("is at the lower", "20220205", 
"20220107", "20220105", false),
+                               Entry("is at the upper", "20220205", 
"20220107", "20220107", false),
+                               Entry("is before the lower", "20220205", 
"20220107", "20220104", false),
+                               Entry("is after the upper", "20220205", 
"20220107", "20220108", false),
+                       )
+                       DescribeTable("It's a inclusive range",
+                               func(start, end, ts string, expected bool) {
+                                       verifyFn(start, end, true, true, ts, 
expected)
+                               },
+                               Entry("is in the middle", "20220205", 
"20220107", "20220106", true),
+                               Entry("is at the lower", "20220205", 
"20220107", "20220105", true),
+                               Entry("is at the upper", "20220205", 
"20220107", "20220107", true),
+                               Entry("is before the lower", "20220205", 
"20220107", "20220104", false),
+                               Entry("is after the upper", "20220205", 
"20220107", "20220108", false),
+                       )
+                       DescribeTable("It's a inclusive lower and exclusive 
upper range",
+                               func(start, end, ts string, expected bool) {
+                                       verifyFn(start, end, true, false, ts, 
expected)
+                               },
+                               Entry("is in the middle", "20220205", 
"20220107", "20220106", true),
+                               Entry("is at the lower", "20220205", 
"20220107", "20220105", true),
+                               Entry("is at the upper", "20220205", 
"20220107", "20220107", false),
+                               Entry("is before the lower", "20220205", 
"20220107", "20220104", false),
+                               Entry("is after the upper", "20220205", 
"20220107", "20220108", false),
+                       )
+                       DescribeTable("It's a exclusive lower and inclusive 
upper range",
+                               func(start, end, ts string, expected bool) {
+                                       verifyFn(start, end, false, true, ts, 
expected)
+                               },
+                               Entry("is in the middle", "20220205", 
"20220107", "20220106", true),
+                               Entry("is at the lower", "20220205", 
"20220107", "20220105", false),
+                               Entry("is at the upper", "20220205", 
"20220107", "20220107", true),
+                               Entry("is before the lower", "20220205", 
"20220107", "20220104", false),
+                               Entry("is after the upper", "20220205", 
"20220107", "20220108", false),
+                       )
+               })
+               Context("Overlapping", func() {
+                       verifyFn := func(start1, end1, start2, end2 string, 
expected bool) {
+                               startTime1, _ := time.Parse("20060102", start1)
+                               endTime1, _ := time.Parse("20060102", end1)
+                               startTime2, _ := time.Parse("20060102", start2)
+                               endTime2, _ := time.Parse("20060102", end2)
+                               includes := []bool{true, false}
+
+                               for _, r1l := range includes {
+                                       for _, r1u := range includes {
+                                               for _, r2l := range includes {
+                                                       for _, r2u := range 
includes {
+                                                               
By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l, 
r2u), func() {
+                                                                       r1 := 
tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u)
+                                                                       r2 := 
tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u)
+                                                                       
Expect(r1.Overlapping(r2)).To(Equal(expected))
+                                                                       
Expect(r2.Overlapping(r1)).To(Equal(expected))
+                                                               })
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       DescribeTable("Each range type's behavior is identical",
+                               verifyFn,
+                               Entry("is no overlapping", "20220205", 
"20220107", "20220108", "20220112", false),
+                               Entry("is the two range are identical", 
"20220105", "20220107", "20220105", "20220107", true),
+                               Entry("is the one includes the other", 
"20220102", "20220107", "20220103", "20220106", true),
+                               Entry("is the one includes the other, the upper 
bounds are identical", "20220102", "20220107", "20220103", "20220107", true),
+                               Entry("is the one includes the other, the lower 
bounds are identical", "20220102", "20220107", "20220102", "20220106", true),
+                               Entry("is they have an intersection", 
"20220102", "20220105", "20220103", "20220106", true),
+                       )
+                       adjacentVerifyFn := func(include1, include2, expected 
bool) {
+                               startTime1, _ := time.Parse("20060102", 
"20210105")
+                               endTime1, _ := time.Parse("20060102", 
"20210107")
+                               startTime2, _ := time.Parse("20060102", 
"20210107")
+                               endTime2, _ := time.Parse("20060102", 
"20210109")
+                               r1 := tsdb.NewTimeRange(startTime1, endTime1, 
false, include1)
+                               r2 := tsdb.NewTimeRange(startTime2, endTime2, 
include2, false)
+                               Expect(r1.Overlapping(r2)).To(Equal(expected))
+                       }
+
+                       DescribeTable("They are adjacent",
+                               adjacentVerifyFn,
+                               Entry("is they are inclusive", true, true, 
true),
+                               Entry("is that range1 includes upper, but 
range2 excludes lower", true, false, false),
+                               Entry("is that range1 excludes upper, but 
range2 includes lower", false, true, false),
+                               Entry("is they are exclusive", false, false, 
false),
+                       )
+               })
+
+       })
+})
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index c2e9d5e..de269e6 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -106,7 +106,7 @@ type seriesDB struct {
        sync.Mutex
        l *logger.Logger
 
-       lst            []*segment
+       segCtrl        *segmentController
        seriesMetadata kv.Store
        sID            common.ShardID
 }
@@ -134,7 +134,11 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, 
error) {
 }
 
 func (s *seriesDB) block(id GlobalItemID) blockDelegate {
-       return s.lst[id.segID].lst[id.blockID].delegate()
+       seg := s.segCtrl.get(id.segID)
+       if seg == nil {
+               return nil
+       }
+       return seg.lst[id.blockID].delegate()
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -191,11 +195,13 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
        return result, err
 }
 
-func (s *seriesDB) span(_ TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange TimeRange) []blockDelegate {
        //TODO: return correct blocks
-       result := make([]blockDelegate, 0, len(s.lst[0].lst))
-       for _, b := range s.lst[0].lst {
-               result = append(result, b.delegate())
+       result := make([]blockDelegate, 0)
+       for _, s := range s.segCtrl.span(timeRange) {
+               for _, b := range s.lst {
+                       result = append(result, b.delegate())
+               }
        }
        return result
 }
@@ -205,23 +211,14 @@ func (s *seriesDB) context() context.Context {
 }
 
 func (s *seriesDB) Close() error {
-       for _, seg := range s.lst {
-               seg.close()
-       }
        return s.seriesMetadata.Close()
 }
 
-func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path 
string, segLst []*segment) (SeriesDatabase, error) {
+func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path 
string, segCtrl *segmentController) (SeriesDatabase, error) {
        sdb := &seriesDB{
-               sID: shardID,
-               lst: segLst,
-       }
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger == nil {
-               return nil, logger.ErrNoLoggerInContext
-       }
-       if pl, ok := parentLogger.(*logger.Logger); ok {
-               sdb.l = pl.Named("series_database")
+               sID:     shardID,
+               segCtrl: segCtrl,
+               l:       logger.Fetch(ctx, "series_database"),
        }
        var err error
        sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 7b1b7b9..7eaf50f 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -23,21 +23,23 @@ import (
        "sync"
        "time"
 
+       "github.com/pkg/errors"
+
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-       sync.Mutex
-       id     common.ShardID
-       logger *logger.Logger
+       l  *logger.Logger
+       id common.ShardID
 
-       location       string
-       seriesDatabase SeriesDatabase
-       indexDatabase  IndexDatabase
-       lst            []*segment
+       seriesDatabase        SeriesDatabase
+       indexDatabase         IndexDatabase
+       segmentController     *segmentController
+       segmentManageStrategy *bucket.Strategy
 }
 
 func (s *shard) ID() common.ShardID {
@@ -52,66 +54,234 @@ func (s *shard) Index() IndexDatabase {
        return s.indexDatabase
 }
 
-func openShard(ctx context.Context, id common.ShardID, location string) 
(*shard, error) {
-       s := &shard{
-               id:       id,
-               location: location,
-       }
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger != nil {
-               if pl, ok := parentLogger.(*logger.Logger); ok {
-                       s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
-               }
+func OpenShard(ctx context.Context, id common.ShardID, root string, 
intervalRule IntervalRule) (Shard, error) {
+       path, err := mkdir(shardTemplate, root, int(id))
+       if err != nil {
+               return nil, errors.Wrapf(err, "make the directory of the shard 
%d ", int(id))
        }
-       loadSeg := func(path string) error {
-               seg, err := newSegment(ctx, path)
-               if err != nil {
-                       return err
-               }
-               {
-                       s.Lock()
-                       defer s.Unlock()
-                       s.lst = append(s.lst, seg)
-               }
-               s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
-               return nil
+       l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
+       l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a 
shard")
+       s := &shard{
+               id:                id,
+               segmentController: newSegmentController(path, intervalRule),
+               l:                 l,
        }
-       err := walkDir(location, segPathPrefix, func(_, absolutePath string) 
error {
-               s.logger.Info().Str("path", absolutePath).Msg("loading a 
segment")
-               return loadSeg(absolutePath)
-       })
+       shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
+       err = s.segmentController.open(shardCtx)
        if err != nil {
                return nil, err
        }
-       if len(s.lst) < 1 {
-               var segPath string
-               segPath, err = mkdir(segTemplate, location, 
time.Now().Format(segFormat))
-               if err != nil {
-                       return nil, err
-               }
-               s.logger.Info().Str("path", segPath).Msg("creating a new 
segment")
-               err = loadSeg(segPath)
-               if err != nil {
-                       return nil, err
-               }
-       }
-       seriesPath, err := mkdir(seriesTemplate, s.location)
+       seriesPath, err := mkdir(seriesTemplate, path)
        if err != nil {
                return nil, err
        }
-       sdb, err := newSeriesDataBase(ctx, s.id, seriesPath, s.lst)
+       sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, 
s.segmentController)
        if err != nil {
                return nil, err
        }
        s.seriesDatabase = sdb
-       idb, err := newIndexDatabase(ctx, s.id, s.lst)
+       idb, err := newIndexDatabase(shardCtx, s.id, s.segmentController)
        if err != nil {
                return nil, err
        }
        s.indexDatabase = idb
+       s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, 
bucket.WithLogger(s.l))
+       if err != nil {
+               return nil, err
+       }
+       s.segmentManageStrategy.Run()
        return s, nil
 }
 
 func (s *shard) Close() error {
+       s.segmentManageStrategy.Close()
+       s.segmentController.close()
        return s.seriesDatabase.Close()
 }
+
+type IntervalUnit int
+
+const (
+       DAY IntervalUnit = iota
+       MONTH
+       YEAR
+       MILLISECOND // only for testing
+)
+
+func (iu IntervalUnit) String() string {
+       switch iu {
+       case DAY:
+               return "day"
+       case MONTH:
+               return "month"
+       case YEAR:
+               return "year"
+       case MILLISECOND:
+               return "millis"
+
+       }
+       panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Format(tm time.Time) string {
+       switch iu {
+       case DAY:
+               return tm.Format(segDayFormat)
+       case MONTH:
+               return tm.Format(segMonthFormat)
+       case YEAR:
+               return tm.Format(segYearFormat)
+       case MILLISECOND:
+               return tm.Format(segMillisecondFormat)
+       }
+       panic("invalid interval unit")
+}
+
+func (iu IntervalUnit) Parse(value string) (time.Time, error) {
+       switch iu {
+       case DAY:
+               return time.Parse(segDayFormat, value)
+       case MONTH:
+               return time.Parse(segMonthFormat, value)
+       case YEAR:
+               return time.Parse(segYearFormat, value)
+       case MILLISECOND:
+               return time.Parse(segMillisecondFormat, value)
+       }
+       panic("invalid interval unit")
+}
+
+type IntervalRule struct {
+       Unit IntervalUnit
+       Num  int
+}
+
+func (ir IntervalRule) NextTime(current time.Time) time.Time {
+       switch ir.Unit {
+       case DAY:
+               return current.AddDate(0, 0, ir.Num)
+       case MONTH:
+               return current.AddDate(0, ir.Num, 0)
+       case YEAR:
+               return current.AddDate(ir.Num, 0, 0)
+       case MILLISECOND:
+               return current.Add(time.Millisecond * time.Duration(ir.Num))
+       }
+       panic("invalid interval unit")
+}
+
+type segmentController struct {
+       sync.RWMutex
+       location     string
+       intervalRule IntervalRule
+       lst          []*segment
+}
+
+func newSegmentController(location string, intervalRule IntervalRule) 
*segmentController {
+       return &segmentController{
+               location:     location,
+               intervalRule: intervalRule,
+       }
+}
+
+func (sc *segmentController) get(segID uint16) *segment {
+       sc.RLock()
+       defer sc.RUnlock()
+       last := len(sc.lst) - 1
+       for i := range sc.lst {
+               s := sc.lst[last-i]
+               if s.id == segID {
+                       return s
+               }
+       }
+       return nil
+}
+
+func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+       sc.RLock()
+       defer sc.RUnlock()
+       last := len(sc.lst) - 1
+       for i := range sc.lst {
+               s := sc.lst[last-i]
+               if s.Overlapping(timeRange) {
+                       ss = append(ss, s)
+               }
+       }
+       return ss
+}
+
+func (sc *segmentController) segments() (ss []*segment) {
+       sc.RLock()
+       defer sc.RUnlock()
+       r := make([]*segment, len(sc.lst))
+       copy(r, sc.lst)
+       return r
+}
+
+func (sc *segmentController) Current() bucket.Reporter {
+       sc.RLock()
+       defer sc.RUnlock()
+       now := time.Now()
+       for _, s := range sc.lst {
+               if s.suffix == sc.intervalRule.Unit.Format(now) {
+                       return s
+               }
+       }
+       // return the latest segment before now
+       if len(sc.lst) > 0 {
+               return sc.lst[len(sc.lst)-1]
+       }
+       return nil
+}
+
+func (sc *segmentController) Next() (bucket.Reporter, error) {
+       return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
+               sc.intervalRule.NextTime(time.Now())))
+}
+
+func (sc *segmentController) open(ctx context.Context) error {
+       err := walkDir(
+               sc.location,
+               segPathPrefix,
+               func(suffix, absolutePath string) error {
+                       _, err := sc.load(ctx, suffix, absolutePath)
+                       return err
+               })
+       if err != nil {
+               return err
+       }
+       if sc.Current() == nil {
+               _, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (sc *segmentController) create(ctx context.Context, suffix string) 
(*segment, error) {
+       segPath, err := mkdir(segTemplate, sc.location, suffix)
+       if err != nil {
+               return nil, err
+       }
+       return sc.load(ctx, suffix, segPath)
+}
+
+func (sc *segmentController) load(ctx context.Context, suffix, path string) 
(seg *segment, err error) {
+       seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+       if err != nil {
+               return nil, err
+       }
+       {
+               sc.Lock()
+               defer sc.Unlock()
+               sc.lst = append(sc.lst, seg)
+       }
+       return seg, nil
+}
+
+func (sc *segmentController) close() {
+       for _, s := range sc.lst {
+               s.close()
+       }
+}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
new file mode 100644
index 0000000..5609f0a
--- /dev/null
+++ b/banyand/tsdb/shard_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb_test
+
+import (
+       "context"
+       "io/ioutil"
+       "strings"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+var _ = Describe("Shard", func() {
+       Describe("Generate segments", func() {
+               var tmp string
+               var deferFn func()
+               var shard tsdb.Shard
+
+               BeforeEach(func() {
+                       var err error
+                       tmp, deferFn, err = test.NewSpace()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               AfterEach(func() {
+                       shard.Close()
+                       deferFn()
+               })
+               It("generates several segments", func() {
+                       var err error
+                       shard, err = tsdb.OpenShard(context.TODO(), 
common.ShardID(0), tmp, tsdb.IntervalRule{
+                               Unit: tsdb.MILLISECOND,
+                               Num:  1000,
+                       })
+                       Expect(err).NotTo(HaveOccurred())
+                       Eventually(func() int {
+                               files, err := ioutil.ReadDir(tmp + "/shard-0")
+                               Expect(err).NotTo(HaveOccurred())
+                               num := 0
+                               for _, fi := range files {
+                                       if fi.IsDir() && 
strings.HasPrefix(fi.Name(), "seg-") {
+                                               num++
+                                       }
+                               }
+                               return num
+                       }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 3))
+               })
+
+       })
+})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3d740b0..a3aff1c 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,8 +49,11 @@ const (
        blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
        globalIndexTemplate = rootPrefix + "index"
 
-       segFormat   = "20060102"
-       blockFormat = "1504"
+       segDayFormat         = "20060102"
+       segMonthFormat       = "200601"
+       segYearFormat        = "2006"
+       segMillisecondFormat = "20060102150405000"
+       blockFormat          = "1504"
 
        dirPerm = 0700
 )
@@ -125,22 +128,17 @@ func (d *database) Close() error {
 }
 
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
-       db := &database{
-               location: opts.Location,
-               shardNum: opts.ShardNum,
-       }
-       parentLogger := ctx.Value(logger.ContextKey)
-       if parentLogger != nil {
-               if pl, ok := parentLogger.(*logger.Logger); ok {
-                       db.logger = pl.Named("tsdb")
-               }
-       }
        if opts.EncodingMethod.EncoderPool == nil || 
opts.EncodingMethod.DecoderPool == nil {
                return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
open database")
        }
        if _, err := mkdir(opts.Location); err != nil {
                return nil, err
        }
+       db := &database{
+               location: opts.Location,
+               shardNum: opts.ShardNum,
+               logger:   logger.Fetch(ctx, "tsdb"),
+       }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        var entries []fs.FileInfo
        var err error
@@ -165,13 +163,11 @@ func initDatabase(ctx context.Context, db *database) 
(Database, error) {
 func createDatabase(ctx context.Context, db *database, startID int) (Database, 
error) {
        var err error
        for i := startID; i < int(db.shardNum); i++ {
-               shardLocation, errInternal := mkdir(shardTemplate, db.location, 
i)
-               if errInternal != nil {
-                       err = multierr.Append(err, errInternal)
-                       continue
-               }
-               db.logger.Info().Int("shard_id", i).Str("path", 
shardLocation).Msg("creating a shard")
-               so, errNewShard := openShard(ctx, common.ShardID(i), 
shardLocation)
+               db.logger.Info().Int("shard_id", i).Msg("creating a shard")
+               so, errNewShard := OpenShard(ctx, common.ShardID(i), 
db.location, IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               })
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -186,17 +182,23 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        //TODO: open the manifest file
        db.Lock()
        defer db.Unlock()
-       err := walkDir(db.location, shardPathPrefix, func(name, absolutePath 
string) error {
-               shardSegs := strings.Split(name, "-")
-               shardID, err := strconv.Atoi(shardSegs[1])
+       err := walkDir(db.location, shardPathPrefix, func(suffix, _ string) 
error {
+               shardID, err := strconv.Atoi(suffix)
                if err != nil {
                        return err
                }
                if shardID >= int(db.shardNum) {
                        return nil
                }
-               db.logger.Info().Int("shard_id", shardID).Str("path", 
absolutePath).Msg("opening a shard")
-               so, errOpenShard := openShard(context.WithValue(ctx, 
logger.ContextKey, db.logger), common.ShardID(shardID), absolutePath)
+               db.logger.Info().Int("shard_id", shardID).Msg("opening a 
existing shard")
+               so, errOpenShard := OpenShard(
+                       context.WithValue(ctx, logger.ContextKey, db.logger),
+                       common.ShardID(shardID),
+                       db.location,
+                       IntervalRule{
+                               Unit: DAY,
+                               Num:  1,
+                       })
                if errOpenShard != nil {
                        return errOpenShard
                }
@@ -218,7 +220,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        return db, nil
 }
 
-type walkFn func(name, absolutePath string) error
+type walkFn func(suffix, absolutePath string) error
 
 func walkDir(root, prefix string, walkFn walkFn) error {
        files, err := ioutil.ReadDir(root)
@@ -229,7 +231,8 @@ func walkDir(root, prefix string, walkFn walkFn) error {
                if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
                        continue
                }
-               errWalk := walkFn(f.Name(), fmt.Sprintf(rootPrefix, 
root)+f.Name())
+               segs := strings.Split(f.Name(), "-")
+               errWalk := walkFn(segs[len(segs)-1], fmt.Sprintf(rootPrefix, 
root)+f.Name())
                if errWalk != nil {
                        return errors.WithMessagef(errWalk, "failed to load: 
%s", f.Name())
                }
diff --git a/pkg/logger/logger.go b/banyand/tsdb/tsdb_suite_test.go
similarity index 53%
copy from pkg/logger/logger.go
copy to banyand/tsdb/tsdb_suite_test.go
index 36896ed..c761503 100644
--- a/pkg/logger/logger.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -14,40 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package logger
+//
+package tsdb_test
 
 import (
-       "strings"
-
-       "github.com/pkg/errors"
-       "github.com/rs/zerolog"
-)
-
-var ContextKey = contextKey{}
-var ErrNoLoggerInContext = errors.New("no logger in context")
+       "testing"
 
-type contextKey struct{}
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
 
-// Logging is the config info
-type Logging struct {
-       Env   string
-       Level string
-}
-
-// Logger is wrapper for rs/zerolog logger with module, it is singleton.
-type Logger struct {
-       module string
-       *zerolog.Logger
-}
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
 
-func (l *Logger) Named(name string) *Logger {
-       module := strings.Join([]string{l.module, name}, ".")
-       subLogger := root.Logger.With().Str("module", module).Logger()
-       return &Logger{module: module, Logger: &subLogger}
+func TestTsdb(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Tsdb Suite")
 }
 
-// Loggable indicates the implement supports logging
-type Loggable interface {
-       SetLogger(*Logger)
-}
+var _ = BeforeSuite(func() {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "info",
+       })).Should(Succeed())
+})
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index bfb9957..75c1cb8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -60,7 +60,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, 
tempDir string) {
        seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
        validateDirectory(tester, seriesPath)
        now := time.Now()
-       segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+       segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
        validateDirectory(tester, segPath)
        validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockFormat)))
 }
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 36896ed..98a190a 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -18,6 +18,7 @@
 package logger
 
 import (
+       "context"
        "strings"
 
        "github.com/pkg/errors"
@@ -51,3 +52,14 @@ func (l *Logger) Named(name string) *Logger {
 type Loggable interface {
        SetLogger(*Logger)
 }
+
+func Fetch(ctx context.Context, name string) *Logger {
+       parentLogger := ctx.Value(ContextKey)
+       if parentLogger == nil {
+               return GetLogger(name)
+       }
+       if pl, ok := parentLogger.(*Logger); ok {
+               return pl.Named(name)
+       }
+       return GetLogger(name)
+}
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index bacb9a1..95f3e3e 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -77,7 +77,9 @@ func executeForShard(series tsdb.SeriesList, timeRange 
tsdb.TimeRange,
                itersInSeries, err := func() ([]tsdb.Iterator, error) {
                        sp, errInner := seriesFound.Span(timeRange)
                        defer func(sp tsdb.SeriesSpan) {
-                               _ = sp.Close()
+                               if sp != nil {
+                                       _ = sp.Close()
+                               }
                        }(sp)
                        if errInner != nil {
                                return nil, errInner
diff --git a/pkg/query/logical/plan_indexscan_local.go 
b/pkg/query/logical/plan_indexscan_local.go
index bd22fd1..a18e62e 100644
--- a/pkg/query/logical/plan_indexscan_local.go
+++ b/pkg/query/logical/plan_indexscan_local.go
@@ -108,7 +108,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, 
error) {
 
        return &localIndexScan{
                orderBy:             orderBySubPlan,
-               timeRange:           tsdb.NewTimeRange(uis.startTime, 
uis.endTime),
+               timeRange:           tsdb.NewInclusiveTimeRange(uis.startTime, 
uis.endTime),
                schema:              s,
                projectionFieldRefs: projFieldsRefs,
                metadata:            uis.metadata,

Reply via email to