This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new c1905988 Fix a bug and update API (#645) c1905988 is described below commit c190598804f9dd651b19a188cac2869bf4d8f5c7 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Mon Apr 14 11:10:02 2025 +0800 Fix a bug and update API (#645) --- CHANGES.md | 5 + api/proto/banyandb/common/v1/common.proto | 4 +- api/proto/banyandb/measure/v1/query.proto | 4 +- api/proto/banyandb/property/v1/rpc.proto | 2 - api/proto/banyandb/stream/v1/query.proto | 4 +- api/proto/banyandb/version.go | 2 +- banyand/internal/storage/rotation.go | 103 ++++--- banyand/internal/storage/rotation_test.go | 26 +- banyand/internal/storage/segment.go | 200 ++++++++++--- banyand/internal/storage/segment_test.go | 481 ++++++++++++++++++++++++++++++ banyand/internal/storage/shard.go | 2 +- banyand/internal/storage/storage.go | 2 +- banyand/internal/storage/tsdb.go | 16 +- banyand/internal/storage/tsdb_test.go | 7 +- banyand/liaison/grpc/discovery.go | 28 +- banyand/liaison/grpc/measure.go | 12 +- banyand/liaison/grpc/stream.go | 12 +- banyand/measure/metadata.go | 11 +- banyand/measure/query.go | 5 +- banyand/queue/pub/pub.go | 25 +- banyand/queue/pub/pub_test.go | 16 +- banyand/queue/pub/selector.go | 3 + banyand/stream/metadata.go | 11 +- banyand/stream/query.go | 193 ++++++++---- docs/api-reference.md | 7 +- pkg/bus/bus.go | 6 +- 26 files changed, 971 insertions(+), 216 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f252e132..a0cecbf9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,11 @@ Release Notes. ### Features - Add sharding_key for TopNAggregation source measure +- API: Update the data matching rule from the node selector to the stage name. + +### Bug Fixes + +- Fix the deadlock issue when loading a closed segment. ## 0.8.0 diff --git a/api/proto/banyandb/common/v1/common.proto b/api/proto/banyandb/common/v1/common.proto index 24846c56..2c6e5d75 100644 --- a/api/proto/banyandb/common/v1/common.proto +++ b/api/proto/banyandb/common/v1/common.proto @@ -93,8 +93,8 @@ message ResourceOpts { IntervalRule ttl = 3; // stages defines the ordered lifecycle stages. Data progresses through these stages sequentially. repeated LifecycleStage stages = 4; - // default_node_selector is the default node selector for queries if node_selector is not specified - string default_node_selector = 5; + // default_stages is the name of the default stage + repeated string default_stages = 5; } // Group is an internal object for Group management diff --git a/api/proto/banyandb/measure/v1/query.proto b/api/proto/banyandb/measure/v1/query.proto index 4537bcf5..05f8309b 100644 --- a/api/proto/banyandb/measure/v1/query.proto +++ b/api/proto/banyandb/measure/v1/query.proto @@ -111,6 +111,6 @@ message QueryRequest { model.v1.QueryOrder order_by = 12; // trace is used to enable trace for the query bool trace = 13; - // node_selector is used to specify the target node for the query - string node_selector = 14; + // stages is used to specify the stage of the data points in the lifecycle + repeated string stages = 14; } diff --git a/api/proto/banyandb/property/v1/rpc.proto b/api/proto/banyandb/property/v1/rpc.proto index 01c032dd..8f70291b 100644 --- a/api/proto/banyandb/property/v1/rpc.proto +++ b/api/proto/banyandb/property/v1/rpc.proto @@ -78,8 +78,6 @@ message QueryRequest { uint32 limit = 6; // trace is used to enable trace for the query bool trace = 7; - // node_selector is used to select the node to query - string node_selector = 8; } // QueryResponse is the response for a query to the Query module. diff --git a/api/proto/banyandb/stream/v1/query.proto b/api/proto/banyandb/stream/v1/query.proto index 4fe18f9e..ae23dd33 100644 --- a/api/proto/banyandb/stream/v1/query.proto +++ b/api/proto/banyandb/stream/v1/query.proto @@ -77,6 +77,6 @@ message QueryRequest { model.v1.TagProjection projection = 8 [(validate.rules).message.required = true]; // trace is used to enable trace for the query bool trace = 9; - // node_selector is used to select the node to query - string node_selector = 10; + // stage is used to specify the stage of the query in the lifecycle + repeated string stages = 10; } diff --git a/api/proto/banyandb/version.go b/api/proto/banyandb/version.go index 7b89d508..2a4b305f 100644 --- a/api/proto/banyandb/version.go +++ b/api/proto/banyandb/version.go @@ -19,7 +19,7 @@ package apiversion // Version is the version of the API. -const Version = "0.8" +const Version = "0.9" // Revision is the revision of the API. Building with -ldflags -X. var revision string diff --git a/banyand/internal/storage/rotation.go b/banyand/internal/storage/rotation.go index e8c6df9f..9d77b339 100644 --- a/banyand/internal/storage/rotation.go +++ b/banyand/internal/storage/rotation.go @@ -49,47 +49,80 @@ func (d *database[T, O]) startRotationTask() error { rt = newRetentionTask(d, options.TTL) } go func(rt *retentionTask[T, O]) { - for ts := range d.tsEventCh { - func(ts int64) { - d.rotationProcessOn.Store(true) - defer d.rotationProcessOn.Store(false) - t := time.Unix(0, ts) - if rt != nil { - rt.run(t, d.logger) + var idleCheckTicker *time.Ticker + var idleCheckC <-chan time.Time + + // Only create the ticker if idleTimeout is at least 1 second + if d.segmentController.idleTimeout >= time.Second { + idleCheckTicker = time.NewTicker(10 * time.Minute) + idleCheckC = idleCheckTicker.C + defer func() { + if idleCheckTicker != nil { + idleCheckTicker.Stop() } - func() { - ss := d.segmentController.segments() - if len(ss) == 0 { - return + }() + } + + for { + select { + case ts, ok := <-d.tsEventCh: + if !ok { + d.logger.Debug().Msg("tsEventCh closed") + return + } + func(ts int64) { + d.rotationProcessOn.Store(true) + defer d.rotationProcessOn.Store(false) + t := time.Unix(0, ts) + if rt != nil { + rt.run(t, d.logger) } - defer func() { - for i := 0; i < len(ss); i++ { - ss[i].DecRef() + func() { + ss, err := d.segmentController.segments(true) // Ensure segments are open + if err != nil { + d.logger.Error().Err(err).Msg("failed to get segments") + return } - }() - for i := range ss { - if ss[i].End.UnixNano() < ts { - ss[i].index.store.Reset() + if len(ss) == 0 { + return } - } - latest := ss[len(ss)-1] - gap := latest.End.UnixNano() - ts - // gap <=0 means the event is from the future - // the segment will be created by a written event directly - if gap <= 0 || gap > newSegmentTimeGap { - return - } - d.incTotalRotationStarted(1) - defer d.incTotalRotationFinished(1) - start := options.SegmentInterval.nextTime(t) - d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment") - _, err := d.segmentController.create(start) - if err != nil { - d.logger.Error().Err(err).Msgf("failed to create new segment.") - d.incTotalRotationErr(1) + defer func() { + for i := 0; i < len(ss); i++ { + ss[i].DecRef() + } + }() + for i := range ss { + if ss[i].End.UnixNano() < ts { + ss[i].index.store.Reset() + } + } + latest := ss[len(ss)-1] + gap := latest.End.UnixNano() - ts + // gap <=0 means the event is from the future + // the segment will be created by a written event directly + if gap <= 0 || gap > newSegmentTimeGap { + return + } + d.incTotalRotationStarted(1) + defer d.incTotalRotationFinished(1) + start := options.SegmentInterval.nextTime(t) + d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment") + _, err = d.segmentController.create(start) + if err != nil { + d.logger.Error().Err(err).Msgf("failed to create new segment.") + d.incTotalRotationErr(1) + } + }() + }(ts) + case <-idleCheckC: + func() { + d.logger.Debug().Msg("checking for idle segments") + closedCount := d.segmentController.closeIdleSegments() + if closedCount > 0 { + d.logger.Info().Int("count", closedCount).Msg("closed idle segments") } }() - }(ts) + } } }(rt) if rt == nil { diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index a0aed52e..43f292ef 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -81,8 +81,9 @@ func TestSegmentBoundaryUpdateFn(t *testing.T) { t.Logf("current time: %s", ts.Format(time.RFC3339)) expected := i + 2 require.EventuallyWithTf(t, func(ct *assert.CollectT) { - if len(segCtrl.segments()) != expected { - ct.Errorf("expect %d segments, got %d", expected, len(segCtrl.segments())) + segments, _ := segCtrl.segments(false) + if len(segments) != expected { + ct.Errorf("expect %d segments, got %d", expected, len(segments)) } }, flags.EventuallyTimeout, time.Millisecond, "wait for %d segment to be created", expected) ts = ts.Add(time.Hour) @@ -111,7 +112,8 @@ func TestForwardRotation(t *testing.T) { t.Logf("current time: %s", ts.Format(time.RFC3339)) tsdb.Tick(ts.UnixNano()) assert.Eventually(t, func() bool { - return len(segCtrl.segments()) == 2 + segments, _ := segCtrl.segments(false) + return len(segments) == 2 }, flags.EventuallyTimeout, time.Millisecond, "wait for the second segment to be created") }) @@ -122,7 +124,8 @@ func TestForwardRotation(t *testing.T) { t.Logf("current time: %s", ts.Format(time.RFC3339)) tsdb.Tick(ts.UnixNano()) assert.Never(t, func() bool { - return len(segCtrl.segments()) == 2 + segments, _ := segCtrl.segments(false) + return len(segments) == 2 }, flags.NeverTimeout, time.Millisecond, "wait for the second segment never to be created") }) } @@ -140,8 +143,9 @@ func TestRetention(t *testing.T) { tsdb.Tick(ts.UnixNano()) expected := i + 2 require.EventuallyWithTf(t, func(ct *assert.CollectT) { - if len(segCtrl.segments()) != expected { - ct.Errorf("expect %d segments, got %d", expected, len(segCtrl.segments())) + segments, _ := segCtrl.segments(false) + if len(segments) != expected { + ct.Errorf("expect %d segments, got %d", expected, len(segments)) } }, flags.EventuallyTimeout, time.Millisecond, "wait for %d segment to be created", expected) // amend the time to the next day @@ -151,7 +155,8 @@ func TestRetention(t *testing.T) { c.Set(ts) tsdb.Tick(ts.UnixNano()) assert.Eventually(t, func() bool { - return len(segCtrl.segments()) == 4 + segments, _ := segCtrl.segments(false) + return len(segments) == 4 }, flags.EventuallyTimeout, time.Millisecond, "wait for the 1st segment to be deleted") }) @@ -165,7 +170,7 @@ func TestRetention(t *testing.T) { tsdb.Tick(ts.UnixNano()) ts = ts.Add(time.Hour) require.EventuallyWithTf(t, func(ct *assert.CollectT) { - ss := segCtrl.segments() + ss, _ := segCtrl.segments(false) defer func() { for i := range ss { ss[i].DecRef() @@ -184,7 +189,7 @@ func TestRetention(t *testing.T) { c.Set(ts) tsdb.Tick(ts.UnixNano()) require.EventuallyWithTf(t, func(ct *assert.CollectT) { - ss := segCtrl.segments() + ss, _ := segCtrl.segments(false) defer func() { for i := range ss { ss[i].DecRef() @@ -233,7 +238,8 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], timestamp.MockClock, * defer seg.DecRef() db := tsdb.(*database[*MockTSTable, any]) - require.Equal(t, len(db.segmentController.segments()), 1) + segments, _ := db.segmentController.segments(false) + require.Equal(t, len(segments), 1) return db, mc, db.segmentController, tracker, func() { tsdb.Close() defFn() diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index b96da252..27bb2059 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -44,17 +44,21 @@ import ( // ErrExpiredData is returned when the data is expired. var ErrExpiredData = errors.New("expired data") +// ErrSegmentClosed is returned when trying to access a closed segment. +var ErrSegmentClosed = errors.New("segment closed") + type segment[T TSTable, O any] struct { - metrics any - option O - creator TSTableCreator[T, O] - l *logger.Logger - index *seriesIndex - sLst atomic.Pointer[[]*shard[T]] - position common.Position + metrics any + tsdbOpts *TSDBOpts[T, O] + l *logger.Logger + index *seriesIndex + sLst atomic.Pointer[[]*shard[T]] + indexMetrics *inverted.Metrics + position common.Position timestamp.TimeRange suffix string location string + lastAccessed atomic.Int64 mu sync.Mutex refCount int32 mustBeDeleted uint32 @@ -74,24 +78,20 @@ func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, e }) options := sc.getOptions() id := generateSegID(options.SegmentInterval.Unit, suffixInteger) - sir, err := newSeriesIndex(ctx, path, options.SeriesIndexFlushTimeoutSeconds, options.SeriesIndexCacheMaxBytes, sc.indexMetrics) - if err != nil { - return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) - } + s = &segment[T, O]{ - id: id, - location: path, - suffix: suffix, - TimeRange: timestamp.NewSectionTimeRange(startTime, endTime), - position: p, - refCount: 1, - index: sir, - metrics: sc.metrics, - creator: options.TSTableCreator, - option: options.Option, + id: id, + location: path, + suffix: suffix, + TimeRange: timestamp.NewSectionTimeRange(startTime, endTime), + position: p, + metrics: sc.metrics, + indexMetrics: sc.indexMetrics, + tsdbOpts: options, } s.l = logger.Fetch(ctx, s.String()) - return s, s.loadShards(int(options.ShardNum)) + s.lastAccessed.Store(time.Now().UnixNano()) + return s, s.initialize(ctx) } func (s *segment[T, O]) loadShards(shardNum int) error { @@ -104,7 +104,7 @@ func (s *segment[T, O]) loadShards(shardNum int) error { return nil } s.l.Info().Int("shard_id", shardID).Msg("loaded a existed shard") - _, err = s.CreateTSTableIfNotExist(common.ShardID(shardID)) + _, err = s.createShardIfNotExist(common.ShardID(shardID)) return err }) } @@ -123,31 +123,99 @@ func (s *segment[T, O]) Tables() (tt []T) { return tt } -func (s *segment[T, O]) incRef() { +func (s *segment[T, O]) incRef(ctx context.Context) error { + s.lastAccessed.Store(time.Now().UnixNano()) + if atomic.LoadInt32(&s.refCount) <= 0 { + return s.initialize(ctx) + } atomic.AddInt32(&s.refCount, 1) + return nil +} + +func (s *segment[T, O]) initialize(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if atomic.LoadInt32(&s.refCount) > 0 { + return nil + } + + ctx = context.WithValue(ctx, logger.ContextKey, s.l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return s.position + }) + + sir, err := newSeriesIndex(ctx, s.location, s.tsdbOpts.SeriesIndexFlushTimeoutSeconds, s.tsdbOpts.SeriesIndexCacheMaxBytes, s.indexMetrics) + if err != nil { + return errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) + } + s.index = sir + + err = s.loadShards(int(s.tsdbOpts.ShardNum)) + if err != nil { + s.index.Close() + s.index = nil + return errors.Wrap(errOpenDatabase, errors.WithMessage(err, "load shards failed").Error()) + } + atomic.StoreInt32(&s.refCount, 1) + + s.l.Info().Stringer("seg", s).Msg("segment initialized") + return nil } func (s *segment[T, O]) DecRef() { - n := atomic.AddInt32(&s.refCount, -1) - if n > 0 { + shouldCleanup := false + + if atomic.LoadInt32(&s.refCount) <= 0 && atomic.LoadUint32(&s.mustBeDeleted) != 0 { + shouldCleanup = true + } else { + for { + current := atomic.LoadInt32(&s.refCount) + if current <= 0 { + return + } + + if atomic.CompareAndSwapInt32(&s.refCount, current, current-1) { + shouldCleanup = current == 1 + break + } + } + } + + if !shouldCleanup { return } + + s.performCleanup() +} + +func (s *segment[T, O]) performCleanup() { s.mu.Lock() defer s.mu.Unlock() + if atomic.LoadInt32(&s.refCount) > 0 && atomic.LoadUint32(&s.mustBeDeleted) == 0 { + return + } + deletePath := "" if atomic.LoadUint32(&s.mustBeDeleted) != 0 { deletePath = s.location } - if err := s.index.Close(); err != nil { - s.l.Panic().Err(err).Msg("failed to close the series index") + if s.index != nil { + if err := s.index.Close(); err != nil { + s.l.Panic().Err(err).Msg("failed to close the series index") + } + s.index = nil } sLst := s.sLst.Load() if sLst != nil { - for _, s := range *sLst { - s.close() + for _, shard := range *sLst { + shard.close() + } + if deletePath == "" { + s.sLst.Store(&[]*shard[T]{}) } } @@ -167,6 +235,10 @@ func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) { } s.mu.Lock() defer s.mu.Unlock() + return s.createShardIfNotExist(id) +} + +func (s *segment[T, O]) createShardIfNotExist(id common.ShardID) (T, error) { if s, ok := s.getShard(id); ok { return s.table, nil } @@ -218,13 +290,14 @@ type segmentController[T TSTable, O any] struct { location string lst []*segment[T, O] deadline atomic.Int64 + idleTimeout time.Duration optsMutex sync.RWMutex sync.RWMutex } func newSegmentController[T TSTable, O any](ctx context.Context, location string, l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, metrics Metrics, - segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, + segmentsBoundaryUpdateFn SegmentBoundaryUpdateFn, idleTimeout time.Duration, ) *segmentController[T, O] { clock, _ := timestamp.GetClock(ctx) p := common.GetPosition(ctx) @@ -239,6 +312,7 @@ func newSegmentController[T TSTable, O any](ctx context.Context, location string segmentBoundaryUpdateFn: segmentsBoundaryUpdateFn, stage: p.Stage, db: p.Database, + idleTimeout: idleTimeout, } } @@ -261,21 +335,24 @@ func (sc *segmentController[T, O]) updateOptions(resourceOpts *commonv1.Resource sc.opts.ShardNum = resourceOpts.ShardNum } -func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) (tt []Segment[T, O]) { +func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) (tt []Segment[T, O], err error) { sc.RLock() defer sc.RUnlock() last := len(sc.lst) - 1 + ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l) for i := range sc.lst { s := sc.lst[last-i] if s.GetTimeRange().End.Before(timeRange.Start) { break } if s.Overlapping(timeRange) { - s.incRef() + if err = s.incRef(ctx); err != nil { + return nil, err + } tt = append(tt, s) } } - return tt + return tt, nil } func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], error) { @@ -287,16 +364,16 @@ func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], if err != nil { return nil, err } - s.incRef() sc.notifySegmentBoundaryUpdate() - return s, nil + return s, s.incRef(context.WithValue(context.Background(), logger.ContextKey, sc.l)) } func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() { if sc.segmentBoundaryUpdateFn == nil { return } - segs := sc.segments() + // No error if we do not open closed segments. + segs, _ := sc.segments(false) defer func() { for i := range segs { segs[i].DecRef() @@ -312,15 +389,49 @@ func (sc *segmentController[T, O]) notifySegmentBoundaryUpdate() { sc.segmentBoundaryUpdateFn(sc.stage, sc.db, tr) } -func (sc *segmentController[T, O]) segments() (ss []*segment[T, O]) { +func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss []*segment[T, O], err error) { sc.RLock() defer sc.RUnlock() r := make([]*segment[T, O], len(sc.lst)) + ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l) for i := range sc.lst { - sc.lst[i].incRef() + if reopenClosed { + if err = sc.lst[i].incRef(ctx); err != nil { + return nil, err + } + } else { + if atomic.LoadInt32(&sc.lst[i].refCount) > 0 { + atomic.AddInt32(&sc.lst[i].refCount, 1) + } + } r[i] = sc.lst[i] } - return r + return r, nil +} + +func (sc *segmentController[T, O]) closeIdleSegments() int { + maxIdleTime := sc.idleTimeout + + now := time.Now().UnixNano() + idleThreshold := now - maxIdleTime.Nanoseconds() + + segs, _ := sc.segments(false) + closedCount := 0 + + for _, seg := range segs { + lastAccess := seg.lastAccessed.Load() + // Only consider segments that have been idle for longer than the threshold + // and have active references (are not already closed) + if lastAccess < idleThreshold && atomic.LoadInt32(&seg.refCount) > 0 { + seg.DecRef() + } + seg.DecRef() + if atomic.LoadInt32(&seg.refCount) == 0 { + closedCount++ + } + } + + return closedCount } func (sc *segmentController[T, O]) format(tm time.Time) string { @@ -446,7 +557,8 @@ func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg } func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment bool, err error) { - for _, s := range sc.segments() { + ss, _ := sc.segments(false) + for _, s := range ss { if s.Before(deadline) { hasSegment = true id := s.id @@ -468,7 +580,8 @@ func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange() *timestamp.Time IncludeStart: true, IncludeEnd: false, } - for _, s := range sc.segments() { + ss, _ := sc.segments(false) + for _, s := range ss { if s.Before(deadline) { if timeRange.Start.IsZero() { timeRange.Start = s.Start @@ -483,7 +596,8 @@ func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange() *timestamp.Time func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange timestamp.TimeRange) int64 { deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration()) var count int64 - for _, s := range sc.segments() { + ss, _ := sc.segments(false) + for _, s := range ss { if s.Before(deadline) && s.Overlapping(timeRange) { s.delete() sc.Lock() diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go new file mode 100644 index 00000000..094bf475 --- /dev/null +++ b/banyand/internal/storage/segment_test.go @@ -0,0 +1,481 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// mockTSTable is a minimal implementation of TSTable for testing. +type mockTSTable struct { + ID common.ShardID +} + +func (m mockTSTable) Close() error { + return nil +} + +func (m mockTSTable) Collect(Metrics) {} + +func (m mockTSTable) TakeFileSnapshot(string) error { return nil } + +// mockTSTableOpener implements the necessary functions to open a TSTable. +type mockTSTableOpener struct{} + +func setupTestEnvironment(t *testing.T) (string, func()) { + t.Helper() + tempDir := t.TempDir() + + // Create test logger + err := logger.Init(logger.Logging{ + Env: "test", + Level: "info", + }) + require.NoError(t, err) + + return tempDir, func() { + // Cleanup function + os.RemoveAll(tempDir) + } +} + +func TestSegmentOpenAndReopen(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-segment") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{ + Unit: DAY, + Num: 1, + }, + TTL: IntervalRule{ + Unit: DAY, + Num: 7, + }, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, // indexMetrics + nil, // metrics + nil, // segmentBoundaryUpdateFn + 5*time.Minute, // idleTimeout + ) + + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + endTime := startTime.Add(24 * time.Hour) + + // Create and open a segment + segmentPath := filepath.Join(tempDir, "segment-"+startTime.Format(dayFormat)) + err := os.MkdirAll(segmentPath, DirPerm) + require.NoError(t, err) + + // Write metadata file + metadataPath := filepath.Join(segmentPath, metadataFilename) + err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm) + require.NoError(t, err) + + // Open segment + suffix := startTime.Format(dayFormat) + segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, suffix) + require.NoError(t, err) + require.NotNil(t, segment) + + // Verify segment is open + assert.Greater(t, segment.refCount, int32(0)) + + // Close segment by decrementing reference count + initialRefCount := segment.refCount + segment.DecRef() + + // Verify segment is closed (refCount reduced) + assert.Equal(t, initialRefCount-1, segment.refCount) + + // Reopen segment + segment.incRef(ctx) + + // Verify segment is properly reopened + assert.Equal(t, initialRefCount, segment.refCount) + + // Verify we can still access segment data + assert.NotNil(t, segment.index) + assert.Equal(t, startTime, segment.Start) + assert.Equal(t, endTime, segment.End) + + // Test that we can create a TSTable after reopening + table, err := segment.CreateTSTableIfNotExist(0) + require.NoError(t, err) + assert.Equal(t, common.ShardID(0), table.ID) +} + +func TestSegmentCloseIfIdle(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-segment") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{ + Unit: DAY, + Num: 1, + }, + TTL: IntervalRule{ + Unit: DAY, + Num: 7, + }, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, // indexMetrics + nil, // metrics + nil, // segmentBoundaryUpdateFn + time.Second, // Set short idle timeout for testing + ) + + // Test time parameters + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + endTime := startTime.Add(24 * time.Hour) + + // Create and open a segment + segmentPath := filepath.Join(tempDir, "segment-"+startTime.Format(dayFormat)) + err := os.MkdirAll(segmentPath, DirPerm) + require.NoError(t, err) + + // Write metadata file + metadataPath := filepath.Join(segmentPath, metadataFilename) + err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm) + require.NoError(t, err) + + // Open segment + suffix := startTime.Format(dayFormat) + segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, suffix) + require.NoError(t, err) + + // Force last access time to be in the past + segment.lastAccessed.Store(time.Now().Add(-time.Minute).UnixNano()) + + // Close if idle should succeed + segment.DecRef() + + // Verify segment is closed + assert.Nil(t, segment.index) + + // Test reopening the segment + segment.incRef(ctx) + + // Verify segment is properly reopened + assert.NotNil(t, segment.index) + assert.Greater(t, segment.refCount, int32(0)) +} + +func TestCloseIdleAndSelectSegments(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-segment-controller") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{ + Unit: DAY, + Num: 1, + }, + TTL: IntervalRule{ + Unit: DAY, + Num: 7, + }, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + // Create segment controller with a short idle timeout (100ms) + idleTimeout := 100 * time.Millisecond + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, // indexMetrics + nil, // metrics + nil, // segmentBoundaryUpdateFn + idleTimeout, // short idle timeout + ) + + // Test time parameters + now := time.Now().UTC() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + day2 := day1.Add(24 * time.Hour) + day3 := day2.Add(24 * time.Hour) + + // Create multiple segments + createSegment := func(startTime time.Time) *segment[mockTSTable, mockTSTableOpener] { + segmentPath := filepath.Join(tempDir, "segment-"+startTime.Format(dayFormat)) + err := os.MkdirAll(segmentPath, DirPerm) + require.NoError(t, err) + + // Write metadata file + metadataPath := filepath.Join(segmentPath, metadataFilename) + err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm) + require.NoError(t, err) + + // Open segment + suffix := startTime.Format(dayFormat) + segment, err := sc.openSegment(ctx, startTime, startTime.Add(24*time.Hour), segmentPath, suffix) + require.NoError(t, err) + + // Add segment to controller's list + sc.Lock() + sc.lst = append(sc.lst, segment) + sc.sortLst() + sc.Unlock() + + return segment + } + + // Create three segments + seg1 := createSegment(day1) + seg2 := createSegment(day2) + seg3 := createSegment(day3) + + // Verify we have three segments + require.Len(t, sc.lst, 3) + + // Make sure all segments have reference counts > 0 + require.Greater(t, seg1.refCount, int32(0)) + require.Greater(t, seg2.refCount, int32(0)) + require.Greater(t, seg3.refCount, int32(0)) + + // Force segments 1 and 3 to be idle (setting last accessed time in the past) + seg1.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) + seg3.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano()) + + // Keep seg2 active + seg2.lastAccessed.Store(time.Now().UnixNano()) + + // Close idle segments + closedCount := sc.closeIdleSegments() + + // We should have closed 2 segments (seg1 and seg3) + assert.Equal(t, 2, closedCount) + + // Check that seg1 and seg3 are closed (index is nil) + assert.Nil(t, seg1.index) + assert.Nil(t, seg3.index) + + // While seg2 remains open + assert.NotNil(t, seg2.index) + + // Now select segments using the entire time range + timeRange := timestamp.NewInclusiveTimeRange(day1, day3.Add(24*time.Hour)) + selectedSegments, err := sc.selectSegments(timeRange) + require.NoError(t, err) + + // Should have selected all 3 segments + require.Len(t, selectedSegments, 3) + + // Verify segments were reopened (they should have an index again) + for _, s := range selectedSegments { + seg := s.(*segment[mockTSTable, mockTSTableOpener]) + assert.NotNil(t, seg.index, "Selected segment should be reopened with a valid index") + assert.Greater(t, seg.refCount, int32(0), "Selected segment should have positive reference count") + seg.DecRef() // Cleanup + } +} + +func TestOpenExistingSegmentWithShards(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-segment-shards") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "test-stage", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, location string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + shardID := common.ShardID(0) + // Extract shard ID from the path + if shardPath := filepath.Base(location); len(shardPath) > 6 { + if id, err := strconv.Atoi(shardPath[6:]); err == nil { + shardID = common.ShardID(id) + } + } + return mockTSTable{ID: shardID}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{ + Unit: DAY, + Num: 1, + }, + TTL: IntervalRule{ + Unit: DAY, + Num: 7, + }, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, // indexMetrics + nil, // metrics + nil, // segmentBoundaryUpdateFn + 5*time.Minute, // idleTimeout + ) + + // Test time parameters + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + endTime := startTime.Add(24 * time.Hour) + + // Create segment directory + suffix := startTime.Format(dayFormat) + segmentPath := filepath.Join(tempDir, "segment-"+suffix) + err := os.MkdirAll(segmentPath, DirPerm) + require.NoError(t, err) + + // Write metadata file + metadataPath := filepath.Join(segmentPath, metadataFilename) + err = os.WriteFile(metadataPath, []byte(currentVersion), FilePerm) + require.NoError(t, err) + + // Create shard directories + for i := 0; i < int(opts.ShardNum); i++ { + shardPath := filepath.Join(segmentPath, fmt.Sprintf("shard-%d", i)) + err = os.MkdirAll(shardPath, DirPerm) + require.NoError(t, err) + + // Add a metadata file to each shard + shardMetadataPath := filepath.Join(shardPath, metadataFilename) + err = os.WriteFile(shardMetadataPath, []byte(currentVersion), FilePerm) + require.NoError(t, err) + } + + // Open the segment + segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, suffix) + require.NoError(t, err) + require.NotNil(t, segment) + + // Verify both shards were loaded + shardList := segment.sLst.Load() + require.NotNil(t, shardList, "Shard list should not be nil") + require.Len(t, *shardList, 2, "Segment should have loaded two shards") + + // Verify shard IDs + shardIDs := make([]common.ShardID, 2) + for i, shard := range *shardList { + shardIDs[i] = shard.id + } + assert.Contains(t, shardIDs, common.ShardID(0), "Shard 0 should be loaded") + assert.Contains(t, shardIDs, common.ShardID(1), "Shard 1 should be loaded") + + // Verify tables can be retrieved + tables := segment.Tables() + require.Len(t, tables, 2, "Should have 2 tables") + + // Make sure each table has the correct ID + tableIDs := make([]common.ShardID, 2) + for i, table := range tables { + tableIDs[i] = table.ID + } + assert.Contains(t, tableIDs, common.ShardID(0), "Table for shard 0 should exist") + assert.Contains(t, tableIDs, common.ShardID(1), "Table for shard 1 should exist") + + // Clean up + segment.DecRef() +} diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index a75db393..6d1c576f 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -43,7 +43,7 @@ func (s *segment[T, O]) openShard(ctx context.Context, id common.ShardID) (*shar l.Info().Int("shard_id", int(id)).Str("path", location).Msg("loading a shard") p := common.GetPosition(ctx) p.Shard = strconv.Itoa(int(id)) - t, err := s.creator(lfs, location, p, l, s.TimeRange, s.option, s.metrics) + t, err := s.tsdbOpts.TSTableCreator(lfs, location, p, l, s.TimeRange, s.tsdbOpts.Option, s.metrics) if err != nil { return nil, err } diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 3e8c148a..1367e367 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -105,7 +105,7 @@ type IndexDB interface { type TSDB[T TSTable, O any] interface { io.Closer CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) - SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] + SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error) Tick(ts int64) UpdateOptions(opts *commonv1.ResourceOpts) TakeFileSnapshot(dst string) error diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index e3ac32cc..eca40aa7 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -67,6 +67,7 @@ type TSDBOpts[T TSTable, O any] struct { SeriesIndexCacheMaxBytes int ShardNum uint32 DisableRetention bool + SegmentIdleTimeout time.Duration } type ( @@ -136,8 +137,8 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ logger: l, tsEventCh: make(chan int64), p: p, - segmentController: newSegmentController[T](ctx, location, - l, opts, indexMetrics, opts.TableMetrics, opts.SegmentBoundaryUpdateFn), + segmentController: newSegmentController(ctx, location, + l, opts, indexMetrics, opts.TableMetrics, opts.SegmentBoundaryUpdateFn, opts.SegmentIdleTimeout), metrics: newMetrics(opts.StorageMetricsFactory), disableRetention: opts.DisableRetention, } @@ -162,9 +163,9 @@ func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], e return d.segmentController.createSegment(ts) } -func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] { +func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error) { if d.closed.Load() { - return nil + return nil, nil } return d.segmentController.selectSegments(timeRange) } @@ -181,7 +182,10 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) error { return errors.New("database is closed") } - segments := d.segmentController.segments() + segments, err := d.segmentController.segments(true) + if err != nil { + return errors.Wrap(err, "failed to get segments") + } defer func() { for _, seg := range segments { seg.DecRef() @@ -239,7 +243,7 @@ func (d *database[T, O]) collect() { } d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load())) refCount := int32(0) - ss := d.segmentController.segments() + ss, _ := d.segmentController.segments(false) for _, s := range ss { for _, t := range s.Tables() { t.Collect(d.segmentController.metrics) diff --git a/banyand/internal/storage/tsdb_test.go b/banyand/internal/storage/tsdb_test.go index dd96807d..ba5eb623 100644 --- a/banyand/internal/storage/tsdb_test.go +++ b/banyand/internal/storage/tsdb_test.go @@ -66,7 +66,8 @@ func TestOpenTSDB(t *testing.T) { defer seg.DecRef() db := tsdb.(*database[*MockTSTable, any]) - require.Equal(t, len(db.segmentController.segments()), 1) + ss, _ := db.segmentController.segments(false) + require.Equal(t, len(ss), 1) tsdb.Close() }) @@ -99,7 +100,7 @@ func TestOpenTSDB(t *testing.T) { seg.DecRef() db := tsdb.(*database[*MockTSTable, any]) - segs := db.segmentController.segments() + segs, _ := db.segmentController.segments(false) require.Equal(t, len(segs), 1) for i := range segs { segs[i].DecRef() @@ -112,7 +113,7 @@ func TestOpenTSDB(t *testing.T) { require.NotNil(t, tsdb) db = tsdb.(*database[*MockTSTable, any]) - segs = db.segmentController.segments() + segs, _ = db.segmentController.segments(false) require.Equal(t, len(segs), 1) for i := range segs { segs[i].DecRef() diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 7eac7940..b85fd472 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -19,6 +19,7 @@ package grpc import ( "fmt" + "strings" "sync" "github.com/pkg/errors" @@ -162,14 +163,35 @@ func (s *groupRepo) shardNum(groupName string) (uint32, bool) { return r.ShardNum, true } -func (s *groupRepo) getNodeSelector(groupName string) (string, bool) { +func (s *groupRepo) getNodeSelector(groupName string, stages []string) ([]string, bool) { s.RWMutex.RLock() defer s.RWMutex.RUnlock() r, ok := s.resourceOpts[groupName] if !ok { - return "", false + return nil, false } - return r.DefaultNodeSelector, true + if len(stages) == 0 { + stages = r.DefaultStages + } + if len(stages) == 0 { + return nil, false + } + + var nodeSelectors []string + for _, stage := range r.Stages { + for _, sn := range stages { + if strings.EqualFold(sn, stage.Name) { + ns := stage.NodeSelector + ns = strings.TrimSpace(ns) + if ns == "" { + continue + } + nodeSelectors = append(nodeSelectors, ns) + break + } + } + } + return nodeSelectors, true } func getID(metadata *commonv1.Metadata) identity { diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index a6761208..15c72fa5 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -215,17 +215,13 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest span.Stop() }() } - nodeSelectors := make(map[string]string) + nodeSelectors := make(map[string][]string) for _, g := range req.Groups { - if req.NodeSelector != "" { - nodeSelectors[g] = req.NodeSelector - continue - } - if ns, exist := ms.groupRepo.getNodeSelector(g); exist { + if ns, exist := ms.groupRepo.getNodeSelector(g, req.Stages); exist { nodeSelectors[g] = ns - continue + } else { + nodeSelectors[g] = nil } - nodeSelectors[g] = "" } feat, err := ms.broadcaster.Publish(ctx, data.TopicMeasureQuery, bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, req.TimeRange, req)) diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 0bbbef5c..b1920878 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -213,17 +213,13 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( span.Stop() }() } - nodeSelectors := make(map[string]string) + nodeSelectors := make(map[string][]string) for _, g := range req.Groups { - if req.NodeSelector != "" { - nodeSelectors[g] = req.NodeSelector - continue - } - if ns, exist := s.groupRepo.getNodeSelector(g); exist { + if ns, exist := s.groupRepo.getNodeSelector(g, req.Stages); exist { nodeSelectors[g] = ns - continue + } else { + nodeSelectors[g] = nil } - nodeSelectors[g] = "" } message := bus.NewMessageWithNodeSelectors(bus.MessageID(now.UnixNano()), nodeSelectors, req.TimeRange, req) feat, errQuery := s.broadcaster.Publish(ctx, data.TopicStreamQuery, message) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index c9a5f168..85560df8 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -384,8 +384,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error shardNum := ro.ShardNum ttl := ro.Ttl segInterval := ro.SegmentInterval + segmentIdleTimeout := time.Duration(0) if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { - matched := false var ttlNum uint32 for _, st := range ro.Stages { if st.Ttl.Unit != ro.Ttl.Unit { @@ -399,14 +399,14 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error if !selector.Matches(s.nodeLabels) { continue } - matched = true + ttl.Num += ttlNum shardNum = st.ShardNum segInterval = st.SegmentInterval + if st.Close { + segmentIdleTimeout = 5 * time.Minute + } break } - if matched { - ttl.Num += ttlNum - } } opts := storage.TSDBOpts[*tsTable, option]{ ShardNum: shardNum, @@ -420,6 +420,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), StorageMetricsFactory: factory, SegmentBoundaryUpdateFn: s.metadata.UpdateSegmentsBoundary, + SegmentIdleTimeout: segmentIdleTimeout, } return storage.OpenTSDB( common.SetPosition(context.Background(), func(_ common.Position) common.Position { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 9fc92260..27b1ff0f 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -89,7 +89,10 @@ func (s *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr tsdb = db.(storage.TSDB[*tsTable, option]) } - segments := tsdb.SelectSegments(*mqo.TimeRange) + segments, err := tsdb.SelectSegments(*mqo.TimeRange) + if err != nil { + return nil, err + } if len(segments) < 1 { return nilResult, nil } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 8d32d6a0..d49baeef 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -87,7 +87,9 @@ func (p *pub) Serve() run.StopNotify { return p.closer.CloseNotify() } -func bypassMatches(_ map[string]string) bool { return true } +var bypassMatches = []MatchFunc{bypassMatch} + +func bypassMatch(_ map[string]string) bool { return true } func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Message) ([]bus.Future, error) { var nodes []*databasev1.Node @@ -106,23 +108,28 @@ func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Mes } } else { for g, sel := range messages.NodeSelectors() { - var matches func(map[string]string) bool - if sel == "" { + var matches []MatchFunc + if sel == nil { matches = bypassMatches } else { - selector, err := ParseLabelSelector(sel) - if err != nil { - return nil, fmt.Errorf("failed to parse node selector: %w", err) + for _, s := range sel { + selector, err := ParseLabelSelector(s) + if err != nil { + return nil, fmt.Errorf("failed to parse node selector: %w", err) + } + matches = append(matches, selector.Matches) } - matches = selector.Matches } for _, n := range nodes { tr := metadata.FindSegmentsBoundary(n, g) if tr == nil { continue } - if matches(n.Labels) && timestamp.PbHasOverlap(messages.TimeRange(), tr) { - names[n.Metadata.Name] = struct{}{} + for _, m := range matches { + if m(n.Labels) && timestamp.PbHasOverlap(messages.TimeRange(), tr) { + names[n.Metadata.Name] = struct{}{} + break + } } } } diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index 9abb59f2..5bd525e8 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -327,8 +327,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) p.OnAddOrUpdate(node2) - nodeSelectors := map[string]string{ - group1: "", + nodeSelectors := map[string][]string{ + group1: {""}, } ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, @@ -384,8 +384,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNodeWithLabels("node2", addr2, node2Labels, node2Boundaries) p.OnAddOrUpdate(node2) - nodeSelectors := map[string]string{ - group1: "role=ingest", + nodeSelectors := map[string][]string{ + group1: {"role=ingest"}, } ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, @@ -459,8 +459,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { End: timestamppb.New(now.Add(-1 * time.Hour).Add(-30 * time.Minute)), } - nodeSelectors := map[string]string{ - group1: "", + nodeSelectors := map[string][]string{ + group1: {""}, } ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, @@ -553,8 +553,8 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { End: timestamppb.New(now.Add(30 * time.Minute)), } - nodeSelectors := map[string]string{ - group1: "env=prod", + nodeSelectors := map[string][]string{ + group1: {"env=prod"}, } ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, diff --git a/banyand/queue/pub/selector.go b/banyand/queue/pub/selector.go index fb9b741e..fc277435 100644 --- a/banyand/queue/pub/selector.go +++ b/banyand/queue/pub/selector.go @@ -29,6 +29,9 @@ type LabelSelector struct { criteria []condition } +// MatchFunc is a function that matches labels. +type MatchFunc func(labels map[string]string) bool + type condition struct { Key string Values []string diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 4891b373..55624e6f 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -296,8 +296,8 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error shardNum := ro.ShardNum ttl := ro.Ttl segInterval := ro.SegmentInterval + segmentIdleTimeout := time.Duration(0) if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { - matched := false var ttlNum uint32 for _, st := range ro.Stages { if st.Ttl.Unit != ro.Ttl.Unit { @@ -311,14 +311,14 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error if !selector.Matches(s.nodeLabels) { continue } - matched = true + ttl.Num += ttlNum shardNum = st.ShardNum segInterval = st.SegmentInterval + if st.Close { + segmentIdleTimeout = 5 * time.Minute + } break } - if matched { - ttl.Num += ttlNum - } } opts := storage.TSDBOpts[*tsTable, option]{ ShardNum: shardNum, @@ -332,6 +332,7 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), StorageMetricsFactory: s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), p.DBLabelValues()))), SegmentBoundaryUpdateFn: s.metadata.UpdateSegmentsBoundary, + SegmentIdleTimeout: segmentIdleTimeout, } return storage.OpenTSDB( common.SetPosition(context.Background(), func(_ common.Position) common.Position { diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 0308a278..8d195e62 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -43,15 +43,55 @@ import ( const checkDoneEvery = 128 func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) { + if err = validateQueryInput(sqo); err != nil { + return nil, err + } + + tsdb, err := s.getTSDB() + if err != nil { + return nil, err + } + + segments, err := tsdb.SelectSegments(*sqo.TimeRange) + if err != nil { + return nil, err + } + if len(segments) < 1 { + return bypassQueryResultInstance, nil + } + + defer func() { + if err != nil { + sqr.Release() + } + }() + + series := prepareSeriesData(sqo) + qo := prepareQueryOptions(sqo) + tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, true) + + if sqo.Order == nil || sqo.Order.Index == nil { + return s.executeTimeSeriesQuery(segments, series, qo, &tr), nil + } + + return s.executeIndexedQuery(ctx, segments, series, sqo, &tr) +} + +func validateQueryInput(sqo model.StreamQueryOptions) error { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { - return nil, errors.New("invalid query options: timeRange and series are required") + return errors.New("invalid query options: timeRange and series are required") } if len(sqo.TagProjection) == 0 { - return nil, errors.New("invalid query options: tagProjection is required") + return errors.New("invalid query options: tagProjection is required") } + return nil +} + +func (s *stream) getTSDB() (storage.TSDB[*tsTable, option], error) { var tsdb storage.TSDB[*tsTable, option] db := s.tsdb.Load() if db == nil { + var err error tsdb, err = s.schemaRepo.loadTSDB(s.group) if err != nil { return nil, err @@ -60,15 +100,10 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m } else { tsdb = db.(storage.TSDB[*tsTable, option]) } - segments := tsdb.SelectSegments(*sqo.TimeRange) - if len(segments) < 1 { - return bypassQueryResultInstance, nil - } - defer func() { - if err != nil { - sqr.Release() - } - }() + return tsdb, nil +} + +func prepareSeriesData(sqo model.StreamQueryOptions) []*pbv1.Series { series := make([]*pbv1.Series, len(sqo.Entities)) for i := range sqo.Entities { series[i] = &pbv1.Series{ @@ -76,30 +111,90 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m EntityValues: sqo.Entities[i], } } - qo := queryOptions{ + return series +} + +func prepareQueryOptions(sqo model.StreamQueryOptions) queryOptions { + return queryOptions{ StreamQueryOptions: sqo, minTimestamp: sqo.TimeRange.Start.UnixNano(), maxTimestamp: sqo.TimeRange.End.UnixNano(), } - tr := index.NewIntRangeOpts(qo.minTimestamp, qo.maxTimestamp, true, true) +} - if sqo.Order == nil || sqo.Order.Index == nil { - result := &tsResult{ - segments: segments, - series: series, - qo: qo, - sm: s, - pm: s.pm, - l: s.l, - tr: &tr, - } - if sqo.Order == nil { - result.asc = true - } else if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.asc = true - } - return result, nil +func (s *stream) executeTimeSeriesQuery( + segments []storage.Segment[*tsTable, option], + series []*pbv1.Series, + qo queryOptions, + tr *index.RangeOpts, +) model.StreamQueryResult { + result := &tsResult{ + segments: segments, + series: series, + qo: qo, + sm: s, + pm: s.pm, + l: s.l, + tr: tr, + } + + // Determine ascending order + if qo.Order == nil { + result.asc = true + } else if qo.Order.Sort == modelv1.Sort_SORT_ASC || qo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.asc = true + } + + return result +} + +func (s *stream) executeIndexedQuery( + ctx context.Context, + segments []storage.Segment[*tsTable, option], + series []*pbv1.Series, + sqo model.StreamQueryOptions, + tr *index.RangeOpts, +) (model.StreamQueryResult, error) { + result, seriesFilter, resultTS, err := s.processSegmentsAndBuildFilters(ctx, segments, series, sqo, tr) + if err != nil { + return nil, err } + + if seriesFilter.IsEmpty() { + result.Release() + return nil, nil + } + + // Update time range if needed + sids := seriesFilter.ToSlice() + startTS := sqo.TimeRange.Start.UnixNano() + endTS := sqo.TimeRange.End.UnixNano() + minTS, maxTS := updateTimeRange(resultTS, startTS, endTS) + if minTS > startTS || maxTS < endTS { + newTR := timestamp.NewTimeRange(time.Unix(0, minTS), time.Unix(0, maxTS), sqo.TimeRange.IncludeStart, sqo.TimeRange.IncludeEnd) + sqo.TimeRange = &newTR + } + + // Perform index-based sorting + if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); err != nil { + return nil, err + } + + // Set ascending flag + if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.asc = true + } + + return &result, nil +} + +func (s *stream) processSegmentsAndBuildFilters( + ctx context.Context, + segments []storage.Segment[*tsTable, option], + series []*pbv1.Series, + sqo model.StreamQueryOptions, + tr *index.RangeOpts, +) (idxResult, posting.List, posting.List, error) { var result idxResult result.pm = s.pm result.segments = segments @@ -108,32 +203,39 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m StreamQueryOptions: sqo, seriesToEntity: make(map[common.SeriesID][]*modelv1.TagValue), } - var sl pbv1.SeriesList + seriesFilter := roaring.NewPostingList() var resultTS posting.List + var sl pbv1.SeriesList + var err error + for i := range result.segments { sl, err = result.segments[i].Lookup(ctx, series) if err != nil { - return nil, err + return result, nil, nil, err } + var filter, filterTS posting.List - if filter, filterTS, err = indexSearch(ctx, sqo, segments[i].Tables(), sl.ToList().ToSlice(), &tr); err != nil { - return nil, err + if filter, filterTS, err = indexSearch(ctx, sqo, segments[i].Tables(), sl.ToList().ToSlice(), tr); err != nil { + return result, nil, nil, err } + if filter != nil && filter.IsEmpty() { continue } + if result.qo.elementFilter == nil { result.qo.elementFilter = filter resultTS = filterTS } else { if err = result.qo.elementFilter.Union(filter); err != nil { - return nil, err + return result, nil, nil, err } if err = resultTS.Union(filterTS); err != nil { - return nil, err + return result, nil, nil, err } } + for j := range sl { if seriesFilter.Contains(uint64(sl[j].ID)) { continue @@ -141,28 +243,11 @@ func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr m seriesFilter.Insert(uint64(sl[j].ID)) result.qo.seriesToEntity[sl[j].ID] = sl[j].EntityValues } + result.tabs = append(result.tabs, result.segments[i].Tables()...) } - if seriesFilter.IsEmpty() { - result.Release() - return nil, nil - } - sids := seriesFilter.ToSlice() - startTS := sqo.TimeRange.Start.UnixNano() - endTS := sqo.TimeRange.End.UnixNano() - minTS, maxTS := updateTimeRange(resultTS, startTS, endTS) - if minTS > startTS || maxTS < endTS { - newTR := timestamp.NewTimeRange(time.Unix(0, minTS), time.Unix(0, maxTS), sqo.TimeRange.IncludeStart, sqo.TimeRange.IncludeEnd) - sqo.TimeRange = &newTR - } - if result.sortingIter, err = s.indexSort(ctx, sqo, result.tabs, sids); err != nil { - return nil, err - } - if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.asc = true - } - return &result, nil + return result, seriesFilter, resultTS, nil } type queryOptions struct { diff --git a/docs/api-reference.md b/docs/api-reference.md index 9e13a442..c90c4b39 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -481,7 +481,7 @@ Metadata is for multi-tenant, multi-model use | segment_interval | [IntervalRule](#banyandb-common-v1-IntervalRule) | | segment_interval indicates the length of a segment | | ttl | [IntervalRule](#banyandb-common-v1-IntervalRule) | | ttl indicates time to live, how long the data will be cached | | stages | [LifecycleStage](#banyandb-common-v1-LifecycleStage) | repeated | stages defines the ordered lifecycle stages. Data progresses through these stages sequentially. | -| default_node_selector | [string](#string) | | default_node_selector is the default node selector for queries if node_selector is not specified | +| default_stages | [string](#string) | repeated | default_stages is the name of the default stage | @@ -2961,7 +2961,7 @@ QueryRequest is the request contract for query. | limit | [uint32](#uint32) | | limit is used to impose a boundary on the number of records being returned. If top is specified, limit processes the dataset based on top's output | | order_by | [banyandb.model.v1.QueryOrder](#banyandb-model-v1-QueryOrder) | | order_by is given to specify the sort for a tag. | | trace | [bool](#bool) | | trace is used to enable trace for the query | -| node_selector | [string](#string) | | node_selector is used to specify the target node for the query | +| stages | [string](#string) | repeated | stages is used to specify the stage of the data points in the lifecycle | @@ -3471,7 +3471,6 @@ QueryRequest is the request contract for query. | tag_projection | [string](#string) | repeated | tag_projection can be used to select tags of the data points in the response | | limit | [uint32](#uint32) | | | | trace | [bool](#bool) | | trace is used to enable trace for the query | -| node_selector | [string](#string) | | node_selector is used to select the node to query | @@ -3571,7 +3570,7 @@ QueryRequest is the request contract for query. | criteria | [banyandb.model.v1.Criteria](#banyandb-model-v1-Criteria) | | tag_families are indexed. | | projection | [banyandb.model.v1.TagProjection](#banyandb-model-v1-TagProjection) | | projection can be used to select the key names of the element in the response | | trace | [bool](#bool) | | trace is used to enable trace for the query | -| node_selector | [string](#string) | | node_selector is used to select the node to query | +| stages | [string](#string) | repeated | stage is used to specify the stage of the query in the lifecycle | diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index ccbd390a..745000d0 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -48,7 +48,7 @@ type ( // Message is send on the bus to all subscribed listeners. type Message struct { payload payload - nodeSelectors map[string]string + nodeSelectors map[string][]string timeRange *modelv1.TimeRange node string id MessageID @@ -71,7 +71,7 @@ func (m Message) Node() string { } // NodeSelectors returns the node selectors of the Message. -func (m Message) NodeSelectors() map[string]string { +func (m Message) NodeSelectors() map[string][]string { return m.nodeSelectors } @@ -102,7 +102,7 @@ func NewMessageWithNode(id MessageID, node string, data interface{}) Message { // NewMessageWithNodeSelectors returns a new Message with a MessageID and NodeSelectors and embed data. // Nodes matching any of the selectors will receive the message. -func NewMessageWithNodeSelectors(id MessageID, nodeSelectors map[string]string, timeRange *modelv1.TimeRange, data interface{}) Message { +func NewMessageWithNodeSelectors(id MessageID, nodeSelectors map[string][]string, timeRange *modelv1.TimeRange, data interface{}) Message { return Message{id: id, nodeSelectors: nodeSelectors, timeRange: timeRange, payload: data} }