This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 945857d  Fix data race (#169)
945857d is described below

commit 945857d771a5b8eb3aa6d5a34da0ed6297995176
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Sep 2 00:09:45 2022 +0800

    Fix data race (#169)
---
 pkg/schema/metadata.go | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 7e92376..eb7b8aa 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -205,7 +205,7 @@ func (sr *schemaRepo) StoreGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
                sr.data[name] = g
                return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
        }
-       prevGroupSchema := g.groupSchema
+       prevGroupSchema := g.GetSchema()
        if groupSchema.GetMetadata().GetModRevision() <= 
prevGroupSchema.Metadata.ModRevision {
                return g, nil
        }
@@ -226,7 +226,7 @@ func (sr *schemaRepo) StoreGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
        if err != nil {
                return nil, err
        }
-       g.groupSchema = groupSchema
+       g.groupSchema.Store(groupSchema)
        return g, nil
 }
 
@@ -243,7 +243,7 @@ func (sr *schemaRepo) deleteGroup(groupMeta 
*commonv1.Metadata) error {
        if err != nil {
                return err
        }
-       _ = sr.notify(g.groupSchema, databasev1.Action_ACTION_DELETE)
+       _ = sr.notify(g.GetSchema(), databasev1.Action_ACTION_DELETE)
        delete(sr.data, name)
        return nil
 }
@@ -328,7 +328,7 @@ func (sr *schemaRepo) NotifyAll() (err error) {
        sr.RLock()
        defer sr.RUnlock()
        for _, g := range sr.data {
-               err = multierr.Append(err, sr.notify(g.groupSchema, 
databasev1.Action_ACTION_PUT))
+               err = multierr.Append(err, sr.notify(g.GetSchema(), 
databasev1.Action_ACTION_PUT))
                g.mapMutex.RLock()
                for _, s := range g.schemaMap {
                        err = multierr.Append(err, g.notify(s, 
databasev1.Action_ACTION_PUT))
@@ -356,7 +356,7 @@ func (sr *schemaRepo) Close() {
        for _, g := range sr.data {
                err := g.close()
                if err != nil {
-                       sr.l.Err(err).Stringer("group", 
g.groupSchema.Metadata).Msg("closing")
+                       sr.l.Err(err).Stringer("group", 
g.GetSchema().Metadata).Msg("closing")
                }
        }
 }
@@ -364,7 +364,7 @@ func (sr *schemaRepo) Close() {
 var _ Group = (*group)(nil)
 
 type group struct {
-       groupSchema      *commonv1.Group
+       groupSchema      atomic.Pointer[commonv1.Group]
        l                *logger.Logger
        resourceSupplier ResourceSupplier
        repo             discovery.ServiceRepo
@@ -385,7 +385,7 @@ func newGroup(
        entityTopic bus.Topic,
 ) *group {
        g := &group{
-               groupSchema:      groupSchema,
+               groupSchema:      atomic.Pointer[commonv1.Group]{},
                repo:             repo,
                metadata:         metadata,
                l:                l,
@@ -393,12 +393,13 @@ func newGroup(
                resourceSupplier: resourceSupplier,
                entityTopic:      entityTopic,
        }
+       g.groupSchema.Store(groupSchema)
        g.db.Store(db)
        return g
 }
 
 func (g *group) GetSchema() *commonv1.Group {
-       return g.groupSchema
+       return g.groupSchema.Load()
 }
 
 func (g *group) SupplyTSDB() tsdb.Database {
@@ -436,7 +437,7 @@ func (g *group) StoreResource(resourceSchema 
ResourceSchema) (Resource, error) {
        if errIndexRules != nil {
                return nil, errIndexRules
        }
-       sm, errTS := 
g.resourceSupplier.OpenResource(g.groupSchema.GetResourceOpts().ShardNum, g, 
ResourceSpec{
+       sm, errTS := 
g.resourceSupplier.OpenResource(g.GetSchema().GetResourceOpts().ShardNum, g, 
ResourceSpec{
                Schema:     resourceSchema,
                IndexRules: idxRules,
        })

Reply via email to