This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a525325 [YUNIKORN-1834] Calculate user/group headroom (#580)
8a525325 is described below

commit 8a525325e8840bd495f0c14c2dcb184c0d5430cd
Author: Manikandan R <maniraj...@gmail.com>
AuthorDate: Mon Jul 10 13:37:14 2023 +0530

    [YUNIKORN-1834] Calculate user/group headroom (#580)
    
    Closes: #580
    
    Signed-off-by: Manikandan R <maniraj...@gmail.com>
---
 pkg/scheduler/objects/application.go    |  17 +++++
 pkg/scheduler/partition_test.go         | 110 ++++++++++++++++++++++++++++++++
 pkg/scheduler/ugm/group_tracker.go      |   6 ++
 pkg/scheduler/ugm/manager.go            |  31 +++++++++
 pkg/scheduler/ugm/manager_test.go       |  45 +++++++++++++
 pkg/scheduler/ugm/queue_tracker.go      |  30 +++++++++
 pkg/scheduler/ugm/queue_tracker_test.go |  59 +++++++++++++++++
 pkg/scheduler/ugm/user_tracker.go       |   6 ++
 8 files changed, 304 insertions(+)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 0b150b7f..40cf7876 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -953,6 +953,14 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, preemptionDelay
                        continue
                }
 
+               userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath, 
sa.user)
+               if !userHeadroom.FitInMaxUndef(request.GetAllocatedResource()) {
+                       log.Log(log.SchedApplication).Warn("User doesn't have 
required resources to accommodate this request",
+                               zap.String("required resource", 
request.GetAllocatedResource().String()),
+                               zap.String("headroom", userHeadroom.String()))
+                       return nil
+               }
+
                // resource must fit in headroom otherwise skip the request 
(unless preemption could help)
                if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
                        // attempt preemption
@@ -1237,6 +1245,14 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        return alloc
                }
 
+               userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath, 
sa.user)
+               if !userHeadroom.FitInMaxUndef(ask.GetAllocatedResource()) {
+                       log.Log(log.SchedApplication).Warn("User doesn't have 
required resources to accommodate this request",
+                               zap.String("required resource", 
ask.GetAllocatedResource().String()),
+                               zap.String("headroom", userHeadroom.String()))
+                       continue
+               }
+
                // check if this fits in the queue's head room
                if !headRoom.FitInMaxUndef(ask.GetAllocatedResource()) {
                        continue
@@ -1463,6 +1479,7 @@ func (sa *Application) tryNode(node *Node, ask 
*AllocationAsk) *Allocation {
        if !node.preAllocateConditions(ask) {
                return nil
        }
+
        // everything OK really allocate
        alloc := NewAllocation(common.GetNewUUID(), node.NodeID, 
node.GetInstanceType(), ask)
        if node.AddAllocation(alloc) {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index ced8fd43..078a7a26 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -3357,3 +3357,113 @@ func TestTryAllocateMaxRunning(t *testing.T) {
        assert.Equal(t, alloc.GetApplicationID(), appID2, "expected application 
app-2 to be allocated")
        assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask 
alloc-1 to be allocated")
 }
+
+func TestUserHeadroom(t *testing.T) {
+       setupUGM()
+       partition, err := newConfiguredPartition()
+       assert.NilError(t, err, "test partition create failed with error")
+       var res *resources.Resource
+       res, err = resources.NewResourceFromConf(map[string]string{"memory": 
"10", "vcores": "10"})
+       assert.NilError(t, err, "failed to create basic resource")
+       err = partition.AddNode(newNodeMaxResource("node-1", res), nil)
+       assert.NilError(t, err, "test node1 add failed unexpected")
+       err = partition.AddNode(newNodeMaxResource("node-2", res), nil)
+       assert.NilError(t, err, "test node2 add failed unexpected")
+
+       app1 := newApplication(appID1, "default", "root.parent.sub-leaf")
+       res, err = resources.NewResourceFromConf(map[string]string{"memory": 
"3", "vcores": "3"})
+       assert.NilError(t, err, "failed to create resource")
+
+       err = partition.AddApplication(app1)
+       assert.NilError(t, err, "failed to add app-1 to partition")
+       err = app1.AddAllocationAsk(newAllocationAsk(allocID, appID1, res))
+       assert.NilError(t, err, "failed to add ask alloc-1 to app-1")
+
+       app2 := newApplication(appID2, "default", "root.parent.sub-leaf")
+       err = partition.AddApplication(app2)
+       assert.NilError(t, err, "failed to add app-2 to partition")
+       err = app2.AddAllocationAsk(newAllocationAsk(allocID, appID2, res))
+       assert.NilError(t, err, "failed to add ask alloc-1 to app-2")
+
+       // app 1 would be allocated as there is headroom available for the user
+       alloc := partition.tryAllocate()
+       if alloc == nil {
+               t.Fatal("allocation did not return any allocation")
+       }
+       assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not 
the expected allocated")
+
+       // app 2 allocation won't happen as there is no headroom for the user
+       alloc = partition.tryAllocate()
+       if alloc != nil {
+               t.Fatal("allocation should not happen")
+       }
+
+       res1, err := resources.NewResourceFromConf(map[string]string{"memory": 
"5", "vcores": "5"})
+       assert.NilError(t, err, "failed to create resource")
+
+       app3 := newApplication(appID3, "default", "root.leaf")
+       err = partition.AddApplication(app3)
+       assert.NilError(t, err, "failed to add app-3 to partition")
+       err = app3.AddAllocationAsk(newAllocationAsk(allocID, appID3, res1))
+       assert.NilError(t, err, "failed to add ask alloc-1 to app-3")
+
+       // app 3 would be allocated as there is headroom available for the user
+       alloc = partition.tryAllocate()
+       if alloc == nil {
+               t.Fatal("allocation did not return any allocation")
+       }
+       assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not 
the expected allocated")
+
+       app4 := newApplication("app-4", "default", "root.leaf")
+       err = partition.AddApplication(app4)
+       assert.NilError(t, err, "failed to add app-4 to partition")
+       err = app4.AddAllocationAsk(newAllocationAsk(allocID, "app-4", res1))
+       assert.NilError(t, err, "failed to add ask alloc-1 to app-4")
+
+       // app 4 allocation won't happen as there is no headroom for the user
+       alloc = partition.tryAllocate()
+       if alloc != nil {
+               t.Fatal("allocation should not happen")
+       }
+       partition.removeApplication(appID1)
+       partition.removeApplication(appID2)
+       partition.removeApplication(appID3)
+       partition.removeApplication("app-4")
+
+       // create a reservation and ensure reservation has been allocated 
because there is enough headroom for the user to run the app
+       app5 := newApplication("app-5", "default", "root.parent.sub-leaf")
+       err = partition.AddApplication(app5)
+       assert.NilError(t, err, "failed to add app-5 to partition")
+
+       res, err = resources.NewResourceFromConf(map[string]string{"memory": 
"3", "vcores": "3"})
+       assert.NilError(t, err, "failed to create resource")
+       ask := newAllocationAskRepeat("alloc-1", "app-5", res, 2)
+       err = app5.AddAllocationAsk(ask)
+       assert.NilError(t, err, "failed to add ask to app")
+
+       node2 := partition.GetNode(nodeID2)
+       if node2 == nil {
+               t.Fatal("expected node-2 to be returned got nil")
+       }
+       partition.reserve(app5, node2, ask)
+
+       // turn off the second node
+       node1 := partition.GetNode(nodeID1)
+       node1.SetSchedulable(false)
+
+       alloc = partition.tryReservedAllocate()
+       if alloc == nil {
+               t.Fatal("allocation did not return any allocation")
+       }
+       assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), 
"allocation result should have been allocated")
+
+       // create a reservation and ensure reservation has not been allocated 
because there is no headroom for the user
+       ask = newAllocationAskRepeat("alloc-2", "app-5", res, 2)
+       err = app5.AddAllocationAsk(ask)
+       assert.NilError(t, err, "failed to add ask to app")
+       partition.reserve(app5, node2, ask)
+       alloc = partition.tryReservedAllocate()
+       if alloc != nil {
+               t.Fatal("allocation should not happen on other nodes as well")
+       }
+}
diff --git a/pkg/scheduler/ugm/group_tracker.go 
b/pkg/scheduler/ugm/group_tracker.go
index 48287a44..5a7d3ffa 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -71,6 +71,12 @@ func (gt *GroupTracker) setLimits(queuePath string, resource 
*resources.Resource
        gt.queueTracker.setLimit(queuePath, resource, maxApps)
 }
 
+func (gt *GroupTracker) headroom(queuePath string) *resources.Resource {
+       gt.Lock()
+       defer gt.Unlock()
+       return gt.queueTracker.headroom(queuePath)
+}
+
 func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() 
*dao.GroupResourceUsageDAOInfo {
        gt.RLock()
        defer gt.RUnlock()
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index 5b6a41f5..3d5f1001 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -329,6 +329,9 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                }
                limitConfig := &LimitConfig{maxResources: maxResource, 
maxApplications: limit.MaxApplications}
                for _, user := range limit.Users {
+                       if user == "" {
+                               continue
+                       }
                        log.Log(log.SchedUGM).Debug("Processing user limits 
configuration",
                                zap.String("user", user),
                                zap.String("limit", limit.Limit),
@@ -344,6 +347,9 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                        }
                }
                for _, group := range limit.Groups {
+                       if group == "" {
+                               continue
+                       }
                        log.Log(log.SchedUGM).Debug("Processing group limits 
configuration",
                                zap.String("group", group),
                                zap.String("limit", limit.Limit),
@@ -572,6 +578,31 @@ func (m *Manager) getGroupWildCardLimitsConfig(queuePath 
string) *LimitConfig {
        return nil
 }
 
+func (m *Manager) Headroom(queuePath string, user security.UserGroup) 
*resources.Resource {
+       m.RLock()
+       defer m.RUnlock()
+       var userHeadroom *resources.Resource
+       var groupHeadroom *resources.Resource
+       if m.userTrackers[user.User] != nil {
+               userHeadroom = m.userTrackers[user.User].headroom(queuePath)
+               log.Log(log.SchedUGM).Debug("Calculated headroom for user",
+                       zap.String("user", user.User),
+                       zap.String("queue path", queuePath),
+                       zap.String("user headroom", userHeadroom.String()))
+       }
+       group, err := m.getGroup(user)
+       if err == nil {
+               if m.groupTrackers[group] != nil {
+                       groupHeadroom = 
m.groupTrackers[group].headroom(queuePath)
+                       log.Log(log.SchedUGM).Debug("Calculated headroom for 
group",
+                               zap.String("group", group),
+                               zap.String("queue path", queuePath),
+                               zap.String("group headroom", 
groupHeadroom.String()))
+               }
+       }
+       return resources.ComponentWiseMinPermissive(userHeadroom, groupHeadroom)
+}
+
 // ClearUserTrackers only for tests
 func (m *Manager) ClearUserTrackers() {
        m.Lock()
diff --git a/pkg/scheduler/ugm/manager_test.go 
b/pkg/scheduler/ugm/manager_test.go
index c0bae9dd..c8698fe6 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -440,6 +440,51 @@ func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
        assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true)
 }
 
+func TestUserGroupHeadroom(t *testing.T) {
+       setupUGM()
+       // Queue setup:
+       // root->parent
+       user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+       conf := createUpdateConfig(user.User, user.Groups[0])
+       manager := GetUserManager()
+       assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+       expectedResource, err := 
resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "50"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, expectedResource)
+       }
+       usage, err := resources.NewResourceFromConf(map[string]string{"memory": 
"10", "vcores": "10"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage)
+       }
+       assertMaxLimits(t, user, expectedResource, 5)
+       headroom := manager.Headroom("root.parent.leaf", user)
+       assert.Equal(t, resources.Equals(headroom, usage), true)
+
+       increased := manager.IncreaseTrackedResource("root.parent.leaf", 
TestApp1, usage, user)
+       assert.Equal(t, increased, true, "unable to increase tracked resource: 
queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
+
+       headroom = manager.Headroom("root.parent.leaf", user)
+       assert.Equal(t, manager.GetUserTracker(user.User) != nil, true)
+       assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true)
+       assert.Equal(t, resources.Equals(headroom, resources.Multiply(usage, 
0)), true)
+
+       increased = manager.IncreaseTrackedResource("root.parent.leaf", 
TestApp1, usage, user)
+       assert.Equal(t, increased, false, "unable to increase tracked resource: 
queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String())
+
+       // configure limits only for group
+       conf = createUpdateConfigWithWildCardUsersAndGroups("", user.Groups[0], 
"*", "*", "80", "80")
+       assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+       usage1, err := 
resources.NewResourceFromConf(map[string]string{"memory": "70", "vcores": "70"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage)
+       }
+       // ensure group headroom returned when there is no limit settings 
configured for user
+       headroom = manager.Headroom("root.parent", user)
+       assert.Equal(t, resources.Equals(headroom, resources.Sub(usage1, 
usage)), true)
+}
+
 func createUpdateConfigWithWildCardUsersAndGroups(user string, group string, 
wildUser string, wildGroup string, memory string, vcores string) 
configs.PartitionConfig {
        conf := configs.PartitionConfig{
                Name: "test",
diff --git a/pkg/scheduler/ugm/queue_tracker.go 
b/pkg/scheduler/ugm/queue_tracker.go
index 9ca5e1c7..0dd001bd 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -243,6 +243,36 @@ func (qt *QueueTracker) setLimit(queuePath string, 
maxResource *resources.Resour
        childQueueTracker.maxResources = maxResource
 }
 
+func (qt *QueueTracker) headroom(queuePath string) *resources.Resource {
+       log.Log(log.SchedUGM).Debug("Calculating headroom",
+               zap.String("queue path", queuePath))
+       childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath)
+       if childQueuePath != "" {
+               if qt.childQueueTrackers[immediateChildQueueName] != nil {
+                       headroom := 
qt.childQueueTrackers[immediateChildQueueName].headroom(childQueuePath)
+                       if headroom != nil {
+                               return 
resources.ComponentWiseMinPermissive(headroom, qt.maxResources)
+                       }
+               } else {
+                       log.Log(log.SchedUGM).Error("Child queueTracker tracker 
must be available in child queues map",
+                               zap.String("child queueTracker name", 
immediateChildQueueName))
+                       return nil
+               }
+       }
+
+       if !resources.Equals(resources.NewResource(), qt.maxResources) {
+               headroom := qt.maxResources.Clone()
+               headroom.SubOnlyExisting(qt.resourceUsage)
+               log.Log(log.SchedUGM).Debug("Calculated headroom",
+                       zap.String("queue path", queuePath),
+                       zap.String("queue", qt.queueName),
+                       zap.String("max resource", qt.maxResources.String()),
+                       zap.String("headroom", headroom.String()))
+               return headroom
+       }
+       return nil
+}
+
 func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string) 
*dao.ResourceUsageDAOInfo {
        if qt == nil {
                return &dao.ResourceUsageDAOInfo{}
diff --git a/pkg/scheduler/ugm/queue_tracker_test.go 
b/pkg/scheduler/ugm/queue_tracker_test.go
index ea88355a..bcaabb35 100644
--- a/pkg/scheduler/ugm/queue_tracker_test.go
+++ b/pkg/scheduler/ugm/queue_tracker_test.go
@@ -250,6 +250,65 @@ func TestQTQuotaEnforcement(t *testing.T) {
        }
 }
 
+func TestHeadroom(t *testing.T) {
+       leafQT := newQueueTracker("root.parent", "leaf")
+
+       leafMaxRes, err := 
resources.NewResourceFromConf(map[string]string{"mem": "60M", "vcore": "60"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, leafMaxRes)
+       }
+
+       parentQT := newQueueTracker("root", "parent")
+       parentMaxRes := leafMaxRes.Clone()
+       resources.Multiply(parentMaxRes, 2)
+
+       rootQT := newQueueTracker("", "root")
+
+       parentQT.childQueueTrackers["leaf"] = leafQT
+       rootQT.childQueueTrackers["parent"] = parentQT
+
+       // Not even a single queue has been configured with max resource
+       headroom := rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, nil), true)
+
+       leafQT.maxResources = leafMaxRes
+       parentQT.maxResources = parentMaxRes
+
+       // headroom should be equal to max cap of leaf queue as there is no 
usage so far
+       headroom = rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, leafMaxRes), true)
+
+       leafResUsage, err := 
resources.NewResourceFromConf(map[string]string{"mem": "30M", "vcore": "30"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, leafResUsage)
+       }
+       leafQT.resourceUsage = leafResUsage
+
+       // headroom should be equal to sub(max cap of leaf queue - resource 
usage) as there is some usage
+       headroom = rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, leafResUsage), true)
+
+       leafQT.maxResources = resources.Multiply(leafMaxRes, 2)
+       parentQT.maxResources = leafMaxRes
+
+       // headroom should be equal to min (leaf max resources, parent 
resources)
+       headroom = rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, leafMaxRes), true)
+
+       parentQT.maxResources = resources.NewResource()
+
+       // headroom should be equal to sub(max cap of leaf queue - resource 
usage) as there is some usage in leaf and max res of both root and parent is nil
+       headroom = rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, resources.Add(leafMaxRes, 
leafResUsage)), true)
+
+       rootQT.maxResources = leafMaxRes
+
+       // headroom should be equal to min ( (sub(max cap of leaf queue - 
resource usage), root resources) as there is some usage in leaf
+       // and max res of parent is nil
+       headroom = rootQT.headroom("root.parent.leaf")
+       assert.Equal(t, resources.Equals(headroom, leafMaxRes), true)
+}
+
 func getQTResource(qt *QueueTracker) map[string]*resources.Resource {
        resources := make(map[string]*resources.Resource)
        usage := qt.getResourceUsageDAOInfo("")
diff --git a/pkg/scheduler/ugm/user_tracker.go 
b/pkg/scheduler/ugm/user_tracker.go
index eb95824a..022f1881 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -89,6 +89,12 @@ func (ut *UserTracker) setLimits(queuePath string, resource 
*resources.Resource,
        ut.queueTracker.setLimit(queuePath, resource, maxApps)
 }
 
+func (ut *UserTracker) headroom(queuePath string) *resources.Resource {
+       ut.Lock()
+       defer ut.Unlock()
+       return ut.queueTracker.headroom(queuePath)
+}
+
 func (ut *UserTracker) GetUserResourceUsageDAOInfo() 
*dao.UserResourceUsageDAOInfo {
        ut.RLock()
        defer ut.RUnlock()


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to