This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 389314e0 [YUNIKORN-2629] Adding a node can result in a deadlock
389314e0 is described below

commit 389314e002a150bf4285056dc4fad486b9d888b9
Author: Peter Bacsko <pbac...@cloudera.com>
AuthorDate: Thu May 16 22:22:27 2024 +0200

    [YUNIKORN-2629] Adding a node can result in a deadlock
---
 pkg/cache/context.go | 57 +++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ff6e89ed..bea59346 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -1394,7 +1394,7 @@ func (ctx *Context) InitializeState() error {
                log.Log(log.ShimContext).Error("failed to load nodes", 
zap.Error(err))
                return err
        }
-       acceptedNodes, err := ctx.registerNodes(nodes)
+       acceptedNodes, err := ctx.RegisterNodes(nodes)
        if err != nil {
                log.Log(log.ShimContext).Error("failed to register nodes", 
zap.Error(err))
                return err
@@ -1510,11 +1510,17 @@ func (ctx *Context) registerNode(node *v1.Node) error {
        return nil
 }
 
+func (ctx *Context) RegisterNodes(nodes []*v1.Node) ([]*v1.Node, error) {
+       ctx.lock.Lock()
+       defer ctx.lock.Unlock()
+       return ctx.registerNodes(nodes)
+}
+
+// registerNodes registers the nodes to the scheduler core.
+// This method must be called while holding the Context write lock.
 func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) {
        nodesToRegister := make([]*si.NodeInfo, 0)
        pendingNodes := make(map[string]*v1.Node)
-       acceptedNodes := make([]*v1.Node, 0)
-       rejectedNodes := make([]*v1.Node, 0)
 
        // Generate a NodeInfo object for each node and add to the registration 
request
        for _, node := range nodes {
@@ -1535,12 +1541,34 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) 
([]*v1.Node, error) {
                pendingNodes[node.Name] = node
        }
 
-       var wg sync.WaitGroup
+       acceptedNodes, rejectedNodes, err := 
ctx.registerNodesInternal(nodesToRegister, pendingNodes)
+       if err != nil {
+               log.Log(log.ShimContext).Error("Failed to register nodes", 
zap.Error(err))
+               return nil, err
+       }
+
+       for _, node := range acceptedNodes {
+               // post a successful event to the node
+               events.GetRecorder().Eventf(node.DeepCopy(), nil, 
v1.EventTypeNormal, "NodeAccepted", "NodeAccepted",
+                       fmt.Sprintf("node %s is accepted by the scheduler", 
node.Name))
+       }
+       for _, node := range rejectedNodes {
+               // post a failure event to the node
+               events.GetRecorder().Eventf(node.DeepCopy(), nil, 
v1.EventTypeWarning, "NodeRejected", "NodeRejected",
+                       fmt.Sprintf("node %s is rejected by the scheduler", 
node.Name))
+       }
 
+       return acceptedNodes, nil
+}
+
+func (ctx *Context) registerNodesInternal(nodesToRegister []*si.NodeInfo, 
pendingNodes map[string]*v1.Node) ([]*v1.Node, []*v1.Node, error) {
+       acceptedNodes := make([]*v1.Node, 0)
+       rejectedNodes := make([]*v1.Node, 0)
+
+       var wg sync.WaitGroup
        // initialize wait group with the number of responses we expect
        wg.Add(len(pendingNodes))
 
-       // register with the dispatcher so that we can track our response
        handlerID := fmt.Sprintf("%s-%d", registerNodeContextHandler, 
ctx.txnID.Add(1))
        dispatcher.RegisterEventHandler(handlerID, dispatcher.EventTypeNode, 
func(event interface{}) {
                nodeEvent, ok := event.(CachedSchedulerNodeEvent)
@@ -1572,24 +1600,17 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) 
([]*v1.Node, error) {
                RmID:  schedulerconf.GetSchedulerConf().ClusterID,
        }); err != nil {
                log.Log(log.ShimContext).Error("Failed to register nodes", 
zap.Error(err))
-               return nil, err
+               return nil, nil, err
        }
 
+       // write lock must always be held at this point, releasing it while 
waiting to avoid any potential deadlocks
+       ctx.lock.Unlock()
+       defer ctx.lock.Lock()
+
        // wait for all responses to accumulate
        wg.Wait()
 
-       for _, node := range acceptedNodes {
-               // post a successful event to the node
-               events.GetRecorder().Eventf(node.DeepCopy(), nil, 
v1.EventTypeNormal, "NodeAccepted", "NodeAccepted",
-                       fmt.Sprintf("node %s is accepted by the scheduler", 
node.Name))
-       }
-       for _, node := range rejectedNodes {
-               // post a failure event to the node
-               events.GetRecorder().Eventf(node.DeepCopy(), nil, 
v1.EventTypeWarning, "NodeRejected", "NodeRejected",
-                       fmt.Sprintf("node %s is rejected by the scheduler", 
node.Name))
-       }
-
-       return acceptedNodes, nil
+       return acceptedNodes, rejectedNodes, nil
 }
 
 func (ctx *Context) decommissionNode(node *v1.Node) error {


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to