This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 06e06d47b fix(go-client): fix the data race issue caused by concurrent
read/write access to the session list of `metaCall` (#2257)
06e06d47b is described below
commit 06e06d47bfdf7dde5595c2ac407973f2fb3939c4
Author: Dan Wang <[email protected]>
AuthorDate: Wed May 28 15:28:20 2025 +0800
fix(go-client): fix the data race issue caused by concurrent read/write
access to the session list of `metaCall` (#2257)
Fix https://github.com/apache/incubator-pegasus/issues/2256.
The data race was caused by:
1. The `issueBackupMetas()` function returns without waiting for all
subroutines
to complete;
2. Concurrent access (read/write) to the `metas` member (the session list)
of the
`metaCall` struct was not properly synchronized with mutual exclusion.
---
go-client/session/meta_call.go | 63 ++++++++++++++++++++++++++++-----------
go-client/session/meta_session.go | 5 ++--
2 files changed, 49 insertions(+), 19 deletions(-)
diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go
index d846aa09b..6b68a59cb 100644
--- a/go-client/session/meta_call.go
+++ b/go-client/session/meta_call.go
@@ -68,10 +68,13 @@ func newMetaCall(lead int, metas []*metaSession, callFunc
metaCallFunc, meatIPAd
func (c *metaCall) Run(ctx context.Context) (metaResponse, error) {
// the subroutines will be cancelled when this call ends
subCtx, cancel := context.WithCancel(ctx)
- wg := &sync.WaitGroup{}
+
+ var wg sync.WaitGroup
wg.Add(2) // this waitgroup is used to ensure all goroutines exit after
Run ends.
go func() {
+ defer wg.Done()
+
// issue RPC to leader
if !c.issueSingleMeta(subCtx, c.lead) {
select {
@@ -81,10 +84,11 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse,
error) {
// RPC to the backup.
}
}
- wg.Done()
}()
go func() {
+ defer wg.Done()
+
// Automatically issue backup RPC after a period
// when the current leader is suspected unvailable.
select {
@@ -94,7 +98,6 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse,
error) {
c.issueBackupMetas(subCtx)
case <-subCtx.Done():
}
- wg.Done()
}()
// The result of meta query is always a context error, or success.
@@ -112,14 +115,17 @@ func (c *metaCall) Run(ctx context.Context)
(metaResponse, error) {
// issueSingleMeta returns false if we should try another meta
func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
+ c.lock.RLock()
meta := c.metas[curLeader]
- resp, err := c.callFunc(ctx, meta)
+ c.lock.RUnlock()
+ resp, err := c.callFunc(ctx, meta)
if err == nil && resp.GetErr().Errno ==
base.ERR_FORWARD_TO_OTHERS.String() {
- forwardAddr := c.getMetaServiceForwardAddress(resp)
+ forwardAddr := getMetaServiceForwardAddress(resp)
if forwardAddr == nil {
return false
}
+
addr := forwardAddr.GetAddress()
found := false
c.lock.Lock()
@@ -130,6 +136,7 @@ func (c *metaCall) issueSingleMeta(ctx context.Context,
curLeader int) bool {
}
}
c.lock.Unlock()
+
if !found {
c.lock.Lock()
c.metaIPAddrs = append(c.metaIPAddrs, addr)
@@ -137,16 +144,19 @@ func (c *metaCall) issueSingleMeta(ctx context.Context,
curLeader int) bool {
NodeSession: newNodeSession(addr, NodeTypeMeta),
logger: pegalog.GetLogger(),
})
- c.lock.Unlock()
curLeader = len(c.metas) - 1
- c.metas[curLeader].logger.Printf("add forward address
%s as meta server", addr)
- resp, err = c.callFunc(ctx, c.metas[curLeader])
+ meta = c.metas[curLeader]
+ meta.logger.Printf("add forward address %s as meta
server", addr)
+ c.lock.Unlock()
+
+ resp, err = c.callFunc(ctx, meta)
}
}
if err != nil || resp.GetErr().Errno ==
base.ERR_FORWARD_TO_OTHERS.String() {
return false
}
+
// the RPC succeeds, this meta becomes the new leader now.
atomic.StoreUint32(&c.newLead, uint32(curLeader))
select {
@@ -158,24 +168,43 @@ func (c *metaCall) issueSingleMeta(ctx context.Context,
curLeader int) bool {
}
func (c *metaCall) issueBackupMetas(ctx context.Context) {
- for i := range c.metas {
- if i == c.lead {
+ // In issueSingleMeta() function, c.metas might be modified, so here we
first copy
+ // the contents of c.metas to another slice as a snapshot for range
iteration.
+ c.lock.RLock()
+ metas := append([]*metaSession(nil), c.metas...)
+ lead := c.lead
+ c.lock.RUnlock()
+
+ var wg sync.WaitGroup
+ for i := range metas {
+ if i == lead {
continue
}
- // concurrently issue RPC to the rest of meta servers.
+
+ wg.Add(1)
+
+ // Send RPCs concurrently to all backup meta servers.
go func(idx int) {
+ defer wg.Done()
c.issueSingleMeta(ctx, idx)
}(i)
}
+
+ wg.Wait()
}
-func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse)
*base.RPCAddress {
- rep, ok := resp.(*replication.QueryCfgResponse)
- if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() {
+func getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress {
+ queryCfgResp, ok := resp.(*replication.QueryCfgResponse)
+ if !ok || queryCfgResp.GetErr().Errno !=
base.ERR_FORWARD_TO_OTHERS.String() {
return nil
- } else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 {
+ }
+
+ if queryCfgResp.GetPartitions() == nil ||
len(queryCfgResp.GetPartitions()) == 0 {
return nil
- } else {
- return rep.Partitions[0].Primary
}
+
+ // The forward address will be put in partitions[0].primary if exist.
+ // See query_cfg_response's definition in idl/dsn.layer2.thrift and
+ // meta_service::on_query_configuration_by_index().
+ return queryCfgResp.Partitions[0].Primary
}
diff --git a/go-client/session/meta_session.go
b/go-client/session/meta_session.go
index b0e962d1d..9ead2b85a 100644
--- a/go-client/session/meta_session.go
+++ b/go-client/session/meta_session.go
@@ -93,13 +93,14 @@ func NewMetaManager(addrs []string, creator
NodeSessionCreator) *MetaManager {
}
func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc)
(metaResponse, error) {
- lead := m.getCurrentLeader()
- call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs)
+ call := newMetaCall(m.getCurrentLeader(), m.metas, callFunc,
m.metaIPAddrs)
resp, err := call.Run(ctx)
if err == nil {
+ call.lock.RLock()
m.setCurrentLeader(int(call.newLead))
m.setNewMetas(call.metas)
m.setMetaIPAddrs(call.metaIPAddrs)
+ call.lock.RUnlock()
}
return resp, err
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]