This is an automated email from the ASF dual-hosted git repository. mani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push: new 8a525325 [YUNIKORN-1834] Calculate user/group headroom (#580) 8a525325 is described below commit 8a525325e8840bd495f0c14c2dcb184c0d5430cd Author: Manikandan R <maniraj...@gmail.com> AuthorDate: Mon Jul 10 13:37:14 2023 +0530 [YUNIKORN-1834] Calculate user/group headroom (#580) Closes: #580 Signed-off-by: Manikandan R <maniraj...@gmail.com> --- pkg/scheduler/objects/application.go | 17 +++++ pkg/scheduler/partition_test.go | 110 ++++++++++++++++++++++++++++++++ pkg/scheduler/ugm/group_tracker.go | 6 ++ pkg/scheduler/ugm/manager.go | 31 +++++++++ pkg/scheduler/ugm/manager_test.go | 45 +++++++++++++ pkg/scheduler/ugm/queue_tracker.go | 30 +++++++++ pkg/scheduler/ugm/queue_tracker_test.go | 59 +++++++++++++++++ pkg/scheduler/ugm/user_tracker.go | 6 ++ 8 files changed, 304 insertions(+) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 0b150b7f..40cf7876 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -953,6 +953,14 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, preemptionDelay continue } + userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath, sa.user) + if !userHeadroom.FitInMaxUndef(request.GetAllocatedResource()) { + log.Log(log.SchedApplication).Warn("User doesn't have required resources to accommodate this request", + zap.String("required resource", request.GetAllocatedResource().String()), + zap.String("headroom", userHeadroom.String())) + return nil + } + // resource must fit in headroom otherwise skip the request (unless preemption could help) if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) { // attempt preemption @@ -1237,6 +1245,14 @@ func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIte return alloc } + userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath, sa.user) + if !userHeadroom.FitInMaxUndef(ask.GetAllocatedResource()) { + log.Log(log.SchedApplication).Warn("User doesn't have required resources to accommodate this request", + zap.String("required resource", ask.GetAllocatedResource().String()), + zap.String("headroom", userHeadroom.String())) + continue + } + // check if this fits in the queue's head room if !headRoom.FitInMaxUndef(ask.GetAllocatedResource()) { continue @@ -1463,6 +1479,7 @@ func (sa *Application) tryNode(node *Node, ask *AllocationAsk) *Allocation { if !node.preAllocateConditions(ask) { return nil } + // everything OK really allocate alloc := NewAllocation(common.GetNewUUID(), node.NodeID, node.GetInstanceType(), ask) if node.AddAllocation(alloc) { diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index ced8fd43..078a7a26 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -3357,3 +3357,113 @@ func TestTryAllocateMaxRunning(t *testing.T) { assert.Equal(t, alloc.GetApplicationID(), appID2, "expected application app-2 to be allocated") assert.Equal(t, alloc.GetAllocationKey(), allocID, "expected ask alloc-1 to be allocated") } + +func TestUserHeadroom(t *testing.T) { + setupUGM() + partition, err := newConfiguredPartition() + assert.NilError(t, err, "test partition create failed with error") + var res *resources.Resource + res, err = resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"}) + assert.NilError(t, err, "failed to create basic resource") + err = partition.AddNode(newNodeMaxResource("node-1", res), nil) + assert.NilError(t, err, "test node1 add failed unexpected") + err = partition.AddNode(newNodeMaxResource("node-2", res), nil) + assert.NilError(t, err, "test node2 add failed unexpected") + + app1 := newApplication(appID1, "default", "root.parent.sub-leaf") + res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) + assert.NilError(t, err, "failed to create resource") + + err = partition.AddApplication(app1) + assert.NilError(t, err, "failed to add app-1 to partition") + err = app1.AddAllocationAsk(newAllocationAsk(allocID, appID1, res)) + assert.NilError(t, err, "failed to add ask alloc-1 to app-1") + + app2 := newApplication(appID2, "default", "root.parent.sub-leaf") + err = partition.AddApplication(app2) + assert.NilError(t, err, "failed to add app-2 to partition") + err = app2.AddAllocationAsk(newAllocationAsk(allocID, appID2, res)) + assert.NilError(t, err, "failed to add ask alloc-1 to app-2") + + // app 1 would be allocated as there is headroom available for the user + alloc := partition.tryAllocate() + if alloc == nil { + t.Fatal("allocation did not return any allocation") + } + assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") + + // app 2 allocation won't happen as there is no headroom for the user + alloc = partition.tryAllocate() + if alloc != nil { + t.Fatal("allocation should not happen") + } + + res1, err := resources.NewResourceFromConf(map[string]string{"memory": "5", "vcores": "5"}) + assert.NilError(t, err, "failed to create resource") + + app3 := newApplication(appID3, "default", "root.leaf") + err = partition.AddApplication(app3) + assert.NilError(t, err, "failed to add app-3 to partition") + err = app3.AddAllocationAsk(newAllocationAsk(allocID, appID3, res1)) + assert.NilError(t, err, "failed to add ask alloc-1 to app-3") + + // app 3 would be allocated as there is headroom available for the user + alloc = partition.tryAllocate() + if alloc == nil { + t.Fatal("allocation did not return any allocation") + } + assert.Equal(t, alloc.GetResult(), objects.Allocated, "result is not the expected allocated") + + app4 := newApplication("app-4", "default", "root.leaf") + err = partition.AddApplication(app4) + assert.NilError(t, err, "failed to add app-4 to partition") + err = app4.AddAllocationAsk(newAllocationAsk(allocID, "app-4", res1)) + assert.NilError(t, err, "failed to add ask alloc-1 to app-4") + + // app 4 allocation won't happen as there is no headroom for the user + alloc = partition.tryAllocate() + if alloc != nil { + t.Fatal("allocation should not happen") + } + partition.removeApplication(appID1) + partition.removeApplication(appID2) + partition.removeApplication(appID3) + partition.removeApplication("app-4") + + // create a reservation and ensure reservation has been allocated because there is enough headroom for the user to run the app + app5 := newApplication("app-5", "default", "root.parent.sub-leaf") + err = partition.AddApplication(app5) + assert.NilError(t, err, "failed to add app-5 to partition") + + res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) + assert.NilError(t, err, "failed to create resource") + ask := newAllocationAskRepeat("alloc-1", "app-5", res, 2) + err = app5.AddAllocationAsk(ask) + assert.NilError(t, err, "failed to add ask to app") + + node2 := partition.GetNode(nodeID2) + if node2 == nil { + t.Fatal("expected node-2 to be returned got nil") + } + partition.reserve(app5, node2, ask) + + // turn off the second node + node1 := partition.GetNode(nodeID1) + node1.SetSchedulable(false) + + alloc = partition.tryReservedAllocate() + if alloc == nil { + t.Fatal("allocation did not return any allocation") + } + assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), "allocation result should have been allocated") + + // create a reservation and ensure reservation has not been allocated because there is no headroom for the user + ask = newAllocationAskRepeat("alloc-2", "app-5", res, 2) + err = app5.AddAllocationAsk(ask) + assert.NilError(t, err, "failed to add ask to app") + partition.reserve(app5, node2, ask) + alloc = partition.tryReservedAllocate() + if alloc != nil { + t.Fatal("allocation should not happen on other nodes as well") + } +} diff --git a/pkg/scheduler/ugm/group_tracker.go b/pkg/scheduler/ugm/group_tracker.go index 48287a44..5a7d3ffa 100644 --- a/pkg/scheduler/ugm/group_tracker.go +++ b/pkg/scheduler/ugm/group_tracker.go @@ -71,6 +71,12 @@ func (gt *GroupTracker) setLimits(queuePath string, resource *resources.Resource gt.queueTracker.setLimit(queuePath, resource, maxApps) } +func (gt *GroupTracker) headroom(queuePath string) *resources.Resource { + gt.Lock() + defer gt.Unlock() + return gt.queueTracker.headroom(queuePath) +} + func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDAOInfo { gt.RLock() defer gt.RUnlock() diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go index 5b6a41f5..3d5f1001 100644 --- a/pkg/scheduler/ugm/manager.go +++ b/pkg/scheduler/ugm/manager.go @@ -329,6 +329,9 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin } limitConfig := &LimitConfig{maxResources: maxResource, maxApplications: limit.MaxApplications} for _, user := range limit.Users { + if user == "" { + continue + } log.Log(log.SchedUGM).Debug("Processing user limits configuration", zap.String("user", user), zap.String("limit", limit.Limit), @@ -344,6 +347,9 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin } } for _, group := range limit.Groups { + if group == "" { + continue + } log.Log(log.SchedUGM).Debug("Processing group limits configuration", zap.String("group", group), zap.String("limit", limit.Limit), @@ -572,6 +578,31 @@ func (m *Manager) getGroupWildCardLimitsConfig(queuePath string) *LimitConfig { return nil } +func (m *Manager) Headroom(queuePath string, user security.UserGroup) *resources.Resource { + m.RLock() + defer m.RUnlock() + var userHeadroom *resources.Resource + var groupHeadroom *resources.Resource + if m.userTrackers[user.User] != nil { + userHeadroom = m.userTrackers[user.User].headroom(queuePath) + log.Log(log.SchedUGM).Debug("Calculated headroom for user", + zap.String("user", user.User), + zap.String("queue path", queuePath), + zap.String("user headroom", userHeadroom.String())) + } + group, err := m.getGroup(user) + if err == nil { + if m.groupTrackers[group] != nil { + groupHeadroom = m.groupTrackers[group].headroom(queuePath) + log.Log(log.SchedUGM).Debug("Calculated headroom for group", + zap.String("group", group), + zap.String("queue path", queuePath), + zap.String("group headroom", groupHeadroom.String())) + } + } + return resources.ComponentWiseMinPermissive(userHeadroom, groupHeadroom) +} + // ClearUserTrackers only for tests func (m *Manager) ClearUserTrackers() { m.Lock() diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go index c0bae9dd..c8698fe6 100644 --- a/pkg/scheduler/ugm/manager_test.go +++ b/pkg/scheduler/ugm/manager_test.go @@ -440,6 +440,51 @@ func TestSetMaxLimitsForRemovedUsers(t *testing.T) { assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) == nil, true) } +func TestUserGroupHeadroom(t *testing.T) { + setupUGM() + // Queue setup: + // root->parent + user := security.UserGroup{User: "user1", Groups: []string{"group1"}} + conf := createUpdateConfig(user.User, user.Groups[0]) + manager := GetUserManager() + assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root")) + + expectedResource, err := resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "50"}) + if err != nil { + t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, expectedResource) + } + usage, err := resources.NewResourceFromConf(map[string]string{"memory": "10", "vcores": "10"}) + if err != nil { + t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage) + } + assertMaxLimits(t, user, expectedResource, 5) + headroom := manager.Headroom("root.parent.leaf", user) + assert.Equal(t, resources.Equals(headroom, usage), true) + + increased := manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage, user) + assert.Equal(t, increased, true, "unable to increase tracked resource: queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String()) + + headroom = manager.Headroom("root.parent.leaf", user) + assert.Equal(t, manager.GetUserTracker(user.User) != nil, true) + assert.Equal(t, manager.GetGroupTracker(user.Groups[0]) != nil, true) + assert.Equal(t, resources.Equals(headroom, resources.Multiply(usage, 0)), true) + + increased = manager.IncreaseTrackedResource("root.parent.leaf", TestApp1, usage, user) + assert.Equal(t, increased, false, "unable to increase tracked resource: queuepath "+queuePath1+", app "+TestApp1+", res "+usage.String()) + + // configure limits only for group + conf = createUpdateConfigWithWildCardUsersAndGroups("", user.Groups[0], "*", "*", "80", "80") + assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root")) + + usage1, err := resources.NewResourceFromConf(map[string]string{"memory": "70", "vcores": "70"}) + if err != nil { + t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage) + } + // ensure group headroom returned when there is no limit settings configured for user + headroom = manager.Headroom("root.parent", user) + assert.Equal(t, resources.Equals(headroom, resources.Sub(usage1, usage)), true) +} + func createUpdateConfigWithWildCardUsersAndGroups(user string, group string, wildUser string, wildGroup string, memory string, vcores string) configs.PartitionConfig { conf := configs.PartitionConfig{ Name: "test", diff --git a/pkg/scheduler/ugm/queue_tracker.go b/pkg/scheduler/ugm/queue_tracker.go index 9ca5e1c7..0dd001bd 100644 --- a/pkg/scheduler/ugm/queue_tracker.go +++ b/pkg/scheduler/ugm/queue_tracker.go @@ -243,6 +243,36 @@ func (qt *QueueTracker) setLimit(queuePath string, maxResource *resources.Resour childQueueTracker.maxResources = maxResource } +func (qt *QueueTracker) headroom(queuePath string) *resources.Resource { + log.Log(log.SchedUGM).Debug("Calculating headroom", + zap.String("queue path", queuePath)) + childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) + if childQueuePath != "" { + if qt.childQueueTrackers[immediateChildQueueName] != nil { + headroom := qt.childQueueTrackers[immediateChildQueueName].headroom(childQueuePath) + if headroom != nil { + return resources.ComponentWiseMinPermissive(headroom, qt.maxResources) + } + } else { + log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map", + zap.String("child queueTracker name", immediateChildQueueName)) + return nil + } + } + + if !resources.Equals(resources.NewResource(), qt.maxResources) { + headroom := qt.maxResources.Clone() + headroom.SubOnlyExisting(qt.resourceUsage) + log.Log(log.SchedUGM).Debug("Calculated headroom", + zap.String("queue path", queuePath), + zap.String("queue", qt.queueName), + zap.String("max resource", qt.maxResources.String()), + zap.String("headroom", headroom.String())) + return headroom + } + return nil +} + func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string) *dao.ResourceUsageDAOInfo { if qt == nil { return &dao.ResourceUsageDAOInfo{} diff --git a/pkg/scheduler/ugm/queue_tracker_test.go b/pkg/scheduler/ugm/queue_tracker_test.go index ea88355a..bcaabb35 100644 --- a/pkg/scheduler/ugm/queue_tracker_test.go +++ b/pkg/scheduler/ugm/queue_tracker_test.go @@ -250,6 +250,65 @@ func TestQTQuotaEnforcement(t *testing.T) { } } +func TestHeadroom(t *testing.T) { + leafQT := newQueueTracker("root.parent", "leaf") + + leafMaxRes, err := resources.NewResourceFromConf(map[string]string{"mem": "60M", "vcore": "60"}) + if err != nil { + t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, leafMaxRes) + } + + parentQT := newQueueTracker("root", "parent") + parentMaxRes := leafMaxRes.Clone() + resources.Multiply(parentMaxRes, 2) + + rootQT := newQueueTracker("", "root") + + parentQT.childQueueTrackers["leaf"] = leafQT + rootQT.childQueueTrackers["parent"] = parentQT + + // Not even a single queue has been configured with max resource + headroom := rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, nil), true) + + leafQT.maxResources = leafMaxRes + parentQT.maxResources = parentMaxRes + + // headroom should be equal to max cap of leaf queue as there is no usage so far + headroom = rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, leafMaxRes), true) + + leafResUsage, err := resources.NewResourceFromConf(map[string]string{"mem": "30M", "vcore": "30"}) + if err != nil { + t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, leafResUsage) + } + leafQT.resourceUsage = leafResUsage + + // headroom should be equal to sub(max cap of leaf queue - resource usage) as there is some usage + headroom = rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, leafResUsage), true) + + leafQT.maxResources = resources.Multiply(leafMaxRes, 2) + parentQT.maxResources = leafMaxRes + + // headroom should be equal to min (leaf max resources, parent resources) + headroom = rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, leafMaxRes), true) + + parentQT.maxResources = resources.NewResource() + + // headroom should be equal to sub(max cap of leaf queue - resource usage) as there is some usage in leaf and max res of both root and parent is nil + headroom = rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, resources.Add(leafMaxRes, leafResUsage)), true) + + rootQT.maxResources = leafMaxRes + + // headroom should be equal to min ( (sub(max cap of leaf queue - resource usage), root resources) as there is some usage in leaf + // and max res of parent is nil + headroom = rootQT.headroom("root.parent.leaf") + assert.Equal(t, resources.Equals(headroom, leafMaxRes), true) +} + func getQTResource(qt *QueueTracker) map[string]*resources.Resource { resources := make(map[string]*resources.Resource) usage := qt.getResourceUsageDAOInfo("") diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go index eb95824a..022f1881 100644 --- a/pkg/scheduler/ugm/user_tracker.go +++ b/pkg/scheduler/ugm/user_tracker.go @@ -89,6 +89,12 @@ func (ut *UserTracker) setLimits(queuePath string, resource *resources.Resource, ut.queueTracker.setLimit(queuePath, resource, maxApps) } +func (ut *UserTracker) headroom(queuePath string) *resources.Resource { + ut.Lock() + defer ut.Unlock() + return ut.queueTracker.headroom(queuePath) +} + func (ut *UserTracker) GetUserResourceUsageDAOInfo() *dao.UserResourceUsageDAOInfo { ut.RLock() defer ut.RUnlock() --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org