This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 06e06d47b fix(go-client): fix the data race issue caused by concurrent 
read/write access to the session list of `metaCall` (#2257)
06e06d47b is described below

commit 06e06d47bfdf7dde5595c2ac407973f2fb3939c4
Author: Dan Wang <[email protected]>
AuthorDate: Wed May 28 15:28:20 2025 +0800

    fix(go-client): fix the data race issue caused by concurrent read/write 
access to the session list of `metaCall` (#2257)
    
    Fix https://github.com/apache/incubator-pegasus/issues/2256.
    
    The data race was caused by:
    1. The `issueBackupMetas()` function returns without waiting for all 
subroutines
    to complete;
    2. Concurrent access (read/write) to the `metas` member (the session list) 
of the
    `metaCall` struct was not properly synchronized with mutual exclusion.
---
 go-client/session/meta_call.go    | 63 ++++++++++++++++++++++++++++-----------
 go-client/session/meta_session.go |  5 ++--
 2 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go
index d846aa09b..6b68a59cb 100644
--- a/go-client/session/meta_call.go
+++ b/go-client/session/meta_call.go
@@ -68,10 +68,13 @@ func newMetaCall(lead int, metas []*metaSession, callFunc 
metaCallFunc, meatIPAd
 func (c *metaCall) Run(ctx context.Context) (metaResponse, error) {
        // the subroutines will be cancelled when this call ends
        subCtx, cancel := context.WithCancel(ctx)
-       wg := &sync.WaitGroup{}
+
+       var wg sync.WaitGroup
        wg.Add(2) // this waitgroup is used to ensure all goroutines exit after 
Run ends.
 
        go func() {
+               defer wg.Done()
+
                // issue RPC to leader
                if !c.issueSingleMeta(subCtx, c.lead) {
                        select {
@@ -81,10 +84,11 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, 
error) {
                                // RPC to the backup.
                        }
                }
-               wg.Done()
        }()
 
        go func() {
+               defer wg.Done()
+
                // Automatically issue backup RPC after a period
                // when the current leader is suspected unvailable.
                select {
@@ -94,7 +98,6 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, 
error) {
                        c.issueBackupMetas(subCtx)
                case <-subCtx.Done():
                }
-               wg.Done()
        }()
 
        // The result of meta query is always a context error, or success.
@@ -112,14 +115,17 @@ func (c *metaCall) Run(ctx context.Context) 
(metaResponse, error) {
 
 // issueSingleMeta returns false if we should try another meta
 func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
+       c.lock.RLock()
        meta := c.metas[curLeader]
-       resp, err := c.callFunc(ctx, meta)
+       c.lock.RUnlock()
 
+       resp, err := c.callFunc(ctx, meta)
        if err == nil && resp.GetErr().Errno == 
base.ERR_FORWARD_TO_OTHERS.String() {
-               forwardAddr := c.getMetaServiceForwardAddress(resp)
+               forwardAddr := getMetaServiceForwardAddress(resp)
                if forwardAddr == nil {
                        return false
                }
+
                addr := forwardAddr.GetAddress()
                found := false
                c.lock.Lock()
@@ -130,6 +136,7 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, 
curLeader int) bool {
                        }
                }
                c.lock.Unlock()
+
                if !found {
                        c.lock.Lock()
                        c.metaIPAddrs = append(c.metaIPAddrs, addr)
@@ -137,16 +144,19 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, 
curLeader int) bool {
                                NodeSession: newNodeSession(addr, NodeTypeMeta),
                                logger:      pegalog.GetLogger(),
                        })
-                       c.lock.Unlock()
                        curLeader = len(c.metas) - 1
-                       c.metas[curLeader].logger.Printf("add forward address 
%s as meta server", addr)
-                       resp, err = c.callFunc(ctx, c.metas[curLeader])
+                       meta = c.metas[curLeader]
+                       meta.logger.Printf("add forward address %s as meta 
server", addr)
+                       c.lock.Unlock()
+
+                       resp, err = c.callFunc(ctx, meta)
                }
        }
 
        if err != nil || resp.GetErr().Errno == 
base.ERR_FORWARD_TO_OTHERS.String() {
                return false
        }
+
        // the RPC succeeds, this meta becomes the new leader now.
        atomic.StoreUint32(&c.newLead, uint32(curLeader))
        select {
@@ -158,24 +168,43 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, 
curLeader int) bool {
 }
 
 func (c *metaCall) issueBackupMetas(ctx context.Context) {
-       for i := range c.metas {
-               if i == c.lead {
+       // In issueSingleMeta() function, c.metas might be modified, so here we 
first copy
+       // the contents of c.metas to another slice as a snapshot for range 
iteration.
+       c.lock.RLock()
+       metas := append([]*metaSession(nil), c.metas...)
+       lead := c.lead
+       c.lock.RUnlock()
+
+       var wg sync.WaitGroup
+       for i := range metas {
+               if i == lead {
                        continue
                }
-               // concurrently issue RPC to the rest of meta servers.
+
+               wg.Add(1)
+
+               // Send RPCs concurrently to all backup meta servers.
                go func(idx int) {
+                       defer wg.Done()
                        c.issueSingleMeta(ctx, idx)
                }(i)
        }
+
+       wg.Wait()
 }
 
-func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) 
*base.RPCAddress {
-       rep, ok := resp.(*replication.QueryCfgResponse)
-       if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() {
+func getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress {
+       queryCfgResp, ok := resp.(*replication.QueryCfgResponse)
+       if !ok || queryCfgResp.GetErr().Errno != 
base.ERR_FORWARD_TO_OTHERS.String() {
                return nil
-       } else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 {
+       }
+
+       if queryCfgResp.GetPartitions() == nil || 
len(queryCfgResp.GetPartitions()) == 0 {
                return nil
-       } else {
-               return rep.Partitions[0].Primary
        }
+
+       // The forward address will be put in partitions[0].primary if exist.
+       // See query_cfg_response's definition in idl/dsn.layer2.thrift and
+       // meta_service::on_query_configuration_by_index().
+       return queryCfgResp.Partitions[0].Primary
 }
diff --git a/go-client/session/meta_session.go 
b/go-client/session/meta_session.go
index b0e962d1d..9ead2b85a 100644
--- a/go-client/session/meta_session.go
+++ b/go-client/session/meta_session.go
@@ -93,13 +93,14 @@ func NewMetaManager(addrs []string, creator 
NodeSessionCreator) *MetaManager {
 }
 
 func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) 
(metaResponse, error) {
-       lead := m.getCurrentLeader()
-       call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs)
+       call := newMetaCall(m.getCurrentLeader(), m.metas, callFunc, 
m.metaIPAddrs)
        resp, err := call.Run(ctx)
        if err == nil {
+               call.lock.RLock()
                m.setCurrentLeader(int(call.newLead))
                m.setNewMetas(call.metas)
                m.setMetaIPAddrs(call.metaIPAddrs)
+               call.lock.RUnlock()
        }
        return resp, err
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to