This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch tsdb in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a8a832ca06c627275ccb8f913da8f4d0991d56d7 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Mar 14 11:55:24 2024 +0000 Fix: nil tsdb Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- banyand/measure/metadata.go | 6 +++++- banyand/measure/query.go | 6 +++++- banyand/stream/metadata.go | 6 +++++- banyand/stream/query.go | 18 +++++++++++++++--- pkg/schema/metadata.go | 15 ++++++++++++--- pkg/wal/README.md | 2 ++ 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index c6a89320..dece4a1f 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -225,7 +225,11 @@ func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option] if !ok { return nil, fmt.Errorf("group %s not found", groupName) } - return g.SupplyTSDB().(storage.TSDB[*tsTable, option]), nil + db := g.SupplyTSDB() + if db == nil { + return nil, fmt.Errorf("group %s not found", groupName) + } + return db.(storage.TSDB[*tsTable, option]), nil } var _ resourceSchema.ResourceSupplier = (*supplier)(nil) diff --git a/banyand/measure/query.go b/banyand/measure/query.go index bf1c2d7c..cf34a914 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -71,7 +71,11 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 { return nil, errors.New("invalid query options: tagProjection or fieldProjection is required") } - tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable, option]) + db := s.databaseSupplier.SupplyTSDB() + if db == nil { + return nil, errors.New("cannot get tsdb") + } + tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange) defer func() { for i := range tabWrappers { diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 9ef65105..5e823902 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -213,7 +213,11 @@ func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, option] if !ok { return nil, fmt.Errorf("group %s not found", groupName) } - return g.SupplyTSDB().(storage.TSDB[*tsTable, option]), nil + db := g.SupplyTSDB() + if db == nil { + return nil, fmt.Errorf("tsdb for group %s not found", groupName) + } + return db.(storage.TSDB[*tsTable, option]), nil } var _ resourceSchema.ResourceSupplier = (*supplier)(nil) diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 92aca49b..0bb2af8c 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -363,7 +363,11 @@ func (s *stream) Filter(ctx context.Context, sfo pbv1.StreamFilterOptions) (sfr if len(sfo.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } - tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable, option]) + db := s.databaseSupplier.SupplyTSDB() + if db == nil { + return nil, errors.New("no tsdb found") + } + tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*sfo.TimeRange) sort.Slice(tabWrappers, func(i, j int) bool { return tabWrappers[i].GetTimeRange().Start.Before(tabWrappers[j].GetTimeRange().Start) @@ -430,7 +434,11 @@ func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr pbv1 if len(sso.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } - tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable, option]) + db := s.databaseSupplier.SupplyTSDB() + if db == nil { + return nil, errors.New("no tsdb found") + } + tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*sso.TimeRange) defer func() { for i := range tabWrappers { @@ -507,7 +515,11 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S if len(sqo.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } - tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable, option]) + db := s.databaseSupplier.SupplyTSDB() + if db == nil { + return nil, errors.New("no tsdb found") + } + tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) defer func() { for i := range tabWrappers { diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index 75979ebd..36cfdc10 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -253,7 +253,9 @@ func (sr *schemaRepo) storeGroup(groupMeta *commonv1.Metadata) (*group, error) { } sr.l.Info().Str("group", name).Msg("closing the previous tsdb") db := g.SupplyTSDB() - db.Close() + if db != nil { + db.Close() + } sr.l.Info().Str("group", name).Msg("creating a new tsdb") if err := g.init(name); err != nil { return nil, err @@ -442,7 +444,10 @@ func (g *group) GetSchema() *commonv1.Group { } func (g *group) SupplyTSDB() io.Closer { - return g.db.Load().(io.Closer) + if v := g.db.Load(); v != nil { + return v.(io.Closer) + } + return nil } func (g *group) initResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) { @@ -530,7 +535,11 @@ func (g *group) close() (err error) { if !g.isInit() || g.isPortable() { return nil } - return multierr.Append(err, g.SupplyTSDB().Close()) + db := g.SupplyTSDB() + if db != nil { + err = multierr.Append(err, db.Close()) + } + return err } func parseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) { diff --git a/pkg/wal/README.md b/pkg/wal/README.md index e5d7b877..82068238 100644 --- a/pkg/wal/README.md +++ b/pkg/wal/README.md @@ -1,5 +1,7 @@ # WAL +Deprecation Notice: This package is deprecated and will be removed in the future. + ## Benchmark Testing environment: