This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/branch-1.5 by this push: new 389314e0 [YUNIKORN-2629] Adding a node can result in a deadlock 389314e0 is described below commit 389314e002a150bf4285056dc4fad486b9d888b9 Author: Peter Bacsko <pbac...@cloudera.com> AuthorDate: Thu May 16 22:22:27 2024 +0200 [YUNIKORN-2629] Adding a node can result in a deadlock --- pkg/cache/context.go | 57 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index ff6e89ed..bea59346 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -1394,7 +1394,7 @@ func (ctx *Context) InitializeState() error { log.Log(log.ShimContext).Error("failed to load nodes", zap.Error(err)) return err } - acceptedNodes, err := ctx.registerNodes(nodes) + acceptedNodes, err := ctx.RegisterNodes(nodes) if err != nil { log.Log(log.ShimContext).Error("failed to register nodes", zap.Error(err)) return err @@ -1510,11 +1510,17 @@ func (ctx *Context) registerNode(node *v1.Node) error { return nil } +func (ctx *Context) RegisterNodes(nodes []*v1.Node) ([]*v1.Node, error) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + return ctx.registerNodes(nodes) +} + +// registerNodes registers the nodes to the scheduler core. +// This method must be called while holding the Context write lock. func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { nodesToRegister := make([]*si.NodeInfo, 0) pendingNodes := make(map[string]*v1.Node) - acceptedNodes := make([]*v1.Node, 0) - rejectedNodes := make([]*v1.Node, 0) // Generate a NodeInfo object for each node and add to the registration request for _, node := range nodes { @@ -1535,12 +1541,34 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { pendingNodes[node.Name] = node } - var wg sync.WaitGroup + acceptedNodes, rejectedNodes, err := ctx.registerNodesInternal(nodesToRegister, pendingNodes) + if err != nil { + log.Log(log.ShimContext).Error("Failed to register nodes", zap.Error(err)) + return nil, err + } + + for _, node := range acceptedNodes { + // post a successful event to the node + events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal, "NodeAccepted", "NodeAccepted", + fmt.Sprintf("node %s is accepted by the scheduler", node.Name)) + } + for _, node := range rejectedNodes { + // post a failure event to the node + events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeWarning, "NodeRejected", "NodeRejected", + fmt.Sprintf("node %s is rejected by the scheduler", node.Name)) + } + return acceptedNodes, nil +} + +func (ctx *Context) registerNodesInternal(nodesToRegister []*si.NodeInfo, pendingNodes map[string]*v1.Node) ([]*v1.Node, []*v1.Node, error) { + acceptedNodes := make([]*v1.Node, 0) + rejectedNodes := make([]*v1.Node, 0) + + var wg sync.WaitGroup // initialize wait group with the number of responses we expect wg.Add(len(pendingNodes)) - // register with the dispatcher so that we can track our response handlerID := fmt.Sprintf("%s-%d", registerNodeContextHandler, ctx.txnID.Add(1)) dispatcher.RegisterEventHandler(handlerID, dispatcher.EventTypeNode, func(event interface{}) { nodeEvent, ok := event.(CachedSchedulerNodeEvent) @@ -1572,24 +1600,17 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { RmID: schedulerconf.GetSchedulerConf().ClusterID, }); err != nil { log.Log(log.ShimContext).Error("Failed to register nodes", zap.Error(err)) - return nil, err + return nil, nil, err } + // write lock must always be held at this point, releasing it while waiting to avoid any potential deadlocks + ctx.lock.Unlock() + defer ctx.lock.Lock() + // wait for all responses to accumulate wg.Wait() - for _, node := range acceptedNodes { - // post a successful event to the node - events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal, "NodeAccepted", "NodeAccepted", - fmt.Sprintf("node %s is accepted by the scheduler", node.Name)) - } - for _, node := range rejectedNodes { - // post a failure event to the node - events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeWarning, "NodeRejected", "NodeRejected", - fmt.Sprintf("node %s is rejected by the scheduler", node.Name)) - } - - return acceptedNodes, nil + return acceptedNodes, rejectedNodes, nil } func (ctx *Context) decommissionNode(node *v1.Node) error { --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org