This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 945857d Fix data race (#169) 945857d is described below commit 945857d771a5b8eb3aa6d5a34da0ed6297995176 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Sep 2 00:09:45 2022 +0800 Fix data race (#169) --- pkg/schema/metadata.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index 7e92376..eb7b8aa 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -205,7 +205,7 @@ func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) { sr.data[name] = g return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT) } - prevGroupSchema := g.groupSchema + prevGroupSchema := g.GetSchema() if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision { return g, nil } @@ -226,7 +226,7 @@ func (sr *schemaRepo) StoreGroup(groupMeta *commonv1.Metadata) (*group, error) { if err != nil { return nil, err } - g.groupSchema = groupSchema + g.groupSchema.Store(groupSchema) return g, nil } @@ -243,7 +243,7 @@ func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error { if err != nil { return err } - _ = sr.notify(g.groupSchema, databasev1.Action_ACTION_DELETE) + _ = sr.notify(g.GetSchema(), databasev1.Action_ACTION_DELETE) delete(sr.data, name) return nil } @@ -328,7 +328,7 @@ func (sr *schemaRepo) NotifyAll() (err error) { sr.RLock() defer sr.RUnlock() for _, g := range sr.data { - err = multierr.Append(err, sr.notify(g.groupSchema, databasev1.Action_ACTION_PUT)) + err = multierr.Append(err, sr.notify(g.GetSchema(), databasev1.Action_ACTION_PUT)) g.mapMutex.RLock() for _, s := range g.schemaMap { err = multierr.Append(err, g.notify(s, databasev1.Action_ACTION_PUT)) @@ -356,7 +356,7 @@ func (sr *schemaRepo) Close() { for _, g := range sr.data { err := g.close() if err != nil { - sr.l.Err(err).Stringer("group", g.groupSchema.Metadata).Msg("closing") + sr.l.Err(err).Stringer("group", g.GetSchema().Metadata).Msg("closing") } } } @@ -364,7 +364,7 @@ func (sr *schemaRepo) Close() { var _ Group = (*group)(nil) type group struct { - groupSchema *commonv1.Group + groupSchema atomic.Pointer[commonv1.Group] l *logger.Logger resourceSupplier ResourceSupplier repo discovery.ServiceRepo @@ -385,7 +385,7 @@ func newGroup( entityTopic bus.Topic, ) *group { g := &group{ - groupSchema: groupSchema, + groupSchema: atomic.Pointer[commonv1.Group]{}, repo: repo, metadata: metadata, l: l, @@ -393,12 +393,13 @@ func newGroup( resourceSupplier: resourceSupplier, entityTopic: entityTopic, } + g.groupSchema.Store(groupSchema) g.db.Store(db) return g } func (g *group) GetSchema() *commonv1.Group { - return g.groupSchema + return g.groupSchema.Load() } func (g *group) SupplyTSDB() tsdb.Database { @@ -436,7 +437,7 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) { if errIndexRules != nil { return nil, errIndexRules } - sm, errTS := g.resourceSupplier.OpenResource(g.groupSchema.GetResourceOpts().ShardNum, g, ResourceSpec{ + sm, errTS := g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, ResourceSpec{ Schema: resourceSchema, IndexRules: idxRules, })