This is an automated email from the ASF dual-hosted git repository. mani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push: new 54c0bf60 [YUNIKORN-1940] remove getChildQueuePath (#657) 54c0bf60 is described below commit 54c0bf6061774fe8940bc37fb32f4e74b419a7eb Author: PoAn Yang <pay...@apache.org> AuthorDate: Tue Oct 3 11:01:09 2023 +0530 [YUNIKORN-1940] remove getChildQueuePath (#657) Closes: #657 Signed-off-by: Manikandan R <maniraj...@gmail.com> --- pkg/scheduler/ugm/group_tracker.go | 16 +- pkg/scheduler/ugm/manager_test.go | 27 ++-- pkg/scheduler/ugm/queue_tracker.go | 270 ++++++++++++++++---------------- pkg/scheduler/ugm/queue_tracker_test.go | 44 +++--- pkg/scheduler/ugm/user_tracker.go | 17 +- pkg/scheduler/ugm/utilities.go | 13 -- pkg/scheduler/ugm/utilities_test.go | 14 -- 7 files changed, 192 insertions(+), 209 deletions(-) diff --git a/pkg/scheduler/ugm/group_tracker.go b/pkg/scheduler/ugm/group_tracker.go index 1830df2a..f6f316f0 100644 --- a/pkg/scheduler/ugm/group_tracker.go +++ b/pkg/scheduler/ugm/group_tracker.go @@ -53,7 +53,7 @@ func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID string, gt.Lock() defer gt.Unlock() gt.applications[applicationID] = user - return gt.queueTracker.increaseTrackedResource(queuePath, applicationID, group, usage) + return gt.queueTracker.increaseTrackedResource(strings.Split(queuePath, configs.DOT), applicationID, group, usage) } func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) { @@ -65,7 +65,7 @@ func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID string, if removeApp { delete(gt.applications, applicationID) } - return gt.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, removeApp) + return gt.queueTracker.decreaseTrackedResource(strings.Split(queuePath, configs.DOT), applicationID, usage, removeApp) } func (gt *GroupTracker) getTrackedApplications() map[string]string { @@ -77,7 +77,7 @@ func (gt *GroupTracker) getTrackedApplications() map[string]string { func (gt *GroupTracker) setLimits(queuePath string, resource *resources.Resource, maxApps uint64) { gt.Lock() defer gt.Unlock() - gt.queueTracker.setLimit(queuePath, resource, maxApps) + gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), resource, maxApps) } func (gt *GroupTracker) headroom(queuePath string) *resources.Resource { @@ -103,19 +103,19 @@ func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDA func (gt *GroupTracker) IsQueuePathTrackedCompletely(queuePath string) bool { gt.RLock() defer gt.RUnlock() - return gt.queueTracker.IsQueuePathTrackedCompletely(queuePath) + return gt.queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath, configs.DOT)) } func (gt *GroupTracker) IsUnlinkRequired(queuePath string) bool { gt.RLock() defer gt.RUnlock() - return gt.queueTracker.IsUnlinkRequired(queuePath) + return gt.queueTracker.IsUnlinkRequired(strings.Split(queuePath, configs.DOT)) } func (gt *GroupTracker) UnlinkQT(queuePath string) bool { gt.RLock() defer gt.RUnlock() - return gt.queueTracker.UnlinkQT(queuePath) + return gt.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT)) } // canBeRemoved Does "root" queue has any child queue trackers? Is there any running applications in "root" qt? @@ -138,7 +138,7 @@ func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[st } gt.Lock() defer gt.Unlock() - applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(queuePath) + applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(strings.Split(queuePath, configs.DOT)) removedApplications := make(map[string]string) for app := range applications { if u, ok := gt.applications[app]; ok { @@ -151,5 +151,5 @@ func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[st func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool { gt.Lock() defer gt.Unlock() - return gt.queueTracker.canRunApp(queuePath, applicationID, group) + return gt.queueTracker.canRunApp(strings.Split(queuePath, configs.DOT), applicationID, group) } diff --git a/pkg/scheduler/ugm/manager_test.go b/pkg/scheduler/ugm/manager_test.go index a1275b8b..4a0c8365 100644 --- a/pkg/scheduler/ugm/manager_test.go +++ b/pkg/scheduler/ugm/manager_test.go @@ -21,6 +21,7 @@ package ugm import ( "fmt" "strconv" + "strings" "testing" "gotest.tools/v3/assert" @@ -224,19 +225,19 @@ func TestAddRemoveUserAndGroups(t *testing.T) { assert.Equal(t, user.User, manager.GetUserTracker(user.User).userName) assert.Equal(t, user1.User, manager.GetUserTracker(user1.User).userName) - assert.Equal(t, true, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(queuePath1)) - assert.Equal(t, true, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(queuePath2)) - assert.Equal(t, false, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(queuePath1)) - assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(queuePath2)) - assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(queuePath3)) - assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(queuePath4)) - - assert.Equal(t, true, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath1)) - assert.Equal(t, true, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath2)) - assert.Equal(t, false, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath1)) - assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath2)) - assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath3)) - assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(queuePath4)) + assert.Equal(t, true, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT))) + assert.Equal(t, true, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user1.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.User).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT))) + + assert.Equal(t, true, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT))) + assert.Equal(t, true, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user1.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath1, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath2, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath3, configs.DOT))) + assert.Equal(t, false, manager.GetUserTracker(user.Groups[0]).queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath4, configs.DOT))) usage3, err := resources.NewResourceFromConf(map[string]string{"mem": "5M", "vcore": "5"}) if err != nil { diff --git a/pkg/scheduler/ugm/queue_tracker.go b/pkg/scheduler/ugm/queue_tracker.go index 70c70bea..9534cbde 100644 --- a/pkg/scheduler/ugm/queue_tracker.go +++ b/pkg/scheduler/ugm/queue_tracker.go @@ -70,12 +70,25 @@ const ( group ) -func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID string, trackType trackingType, usage *resources.Resource) bool { +func (qt *QueueTracker) increaseTrackedResource(hierarchy []string, applicationID string, trackType trackingType, usage *resources.Resource) bool { log.Log(log.SchedUGM).Debug("Increasing resource usage", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), + zap.Strings("hierarchy", hierarchy), zap.String("application", applicationID), zap.Stringer("resource", usage)) + // depth first: all the way to the leaf, create if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] == nil { + qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName) + } + if !qt.childQueueTrackers[childName].increaseTrackedResource(hierarchy[1:], applicationID, trackType, usage) { + return false + } + } + finalResourceUsage := qt.resourceUsage.Clone() finalResourceUsage.AddTo(usage) wildCardQuotaExceeded := false @@ -85,7 +98,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID if qt.maxRunningApps != 0 && !resources.Equals(resources.NewResource(), qt.maxResources) { log.Log(log.SchedUGM).Debug("applying enforcement checks using limit settings of specific user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Bool("existing app", existingApp), zap.Uint64("max running apps", qt.maxRunningApps), zap.Stringer("max resources", qt.maxResources)) @@ -93,7 +106,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID resources.StrictlyGreaterThan(finalResourceUsage, qt.maxResources) { log.Log(log.SchedUGM).Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of specific user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Bool("existing app", existingApp), zap.Int("current running applications", len(qt.runningApplications)), zap.Uint64("max running applications", qt.maxRunningApps), @@ -115,7 +128,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID if config != nil { log.Log(log.SchedUGM).Debug("applying enforcement checks using limit settings of wild card user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Bool("existing app", existingApp), zap.Uint64("wild card max running apps", config.maxApplications), zap.Stringer("wild card max resources", config.maxResources), @@ -125,7 +138,7 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID if wildCardQuotaExceeded { log.Log(log.SchedUGM).Warn("Unable to increase resource usage as allowing new application to run would exceed either configured max applications or max resources limit of wild card user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Bool("existing app", existingApp), zap.Int("current running applications", len(qt.runningApplications)), zap.Uint64("max running applications", config.maxApplications), @@ -136,24 +149,12 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID } } - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if childQueuePath != common.Empty { - if qt.childQueueTrackers[immediateChildQueueName] == nil { - qt.childQueueTrackers[immediateChildQueueName] = newQueueTracker(qt.queuePath, immediateChildQueueName) - } - allowed := qt.childQueueTrackers[immediateChildQueueName].increaseTrackedResource(childQueuePath, applicationID, trackType, usage) - if !allowed { - return allowed - } - } - qt.resourceUsage.AddTo(usage) qt.runningApplications[applicationID] = true log.Log(log.SchedUGM).Debug("Successfully increased resource usage", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), - zap.String("qt queue path", qt.queuePath), + zap.String("queue path", qt.queuePath), zap.String("application", applicationID), zap.Bool("existing app", existingApp), zap.Stringer("resource", usage), @@ -164,42 +165,44 @@ func (qt *QueueTracker) increaseTrackedResource(queuePath string, applicationID return true } -func (qt *QueueTracker) decreaseTrackedResource(queuePath string, applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) { +func (qt *QueueTracker) decreaseTrackedResource(hierarchy []string, applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) { log.Log(log.SchedUGM).Debug("Decreasing resource usage", - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), + zap.Strings("hierarchy", hierarchy), zap.String("application", applicationID), zap.Stringer("resource", usage), zap.Bool("removeApp", removeApp)) - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if childQueuePath != common.Empty { - if qt.childQueueTrackers[immediateChildQueueName] == nil { + // depth first: all the way to the leaf, return false if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] == nil { log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map", - zap.String("child queueTracker name", immediateChildQueueName)) + zap.String("child queueTracker name", childName)) return false, false } - removeQT, decreased := qt.childQueueTrackers[immediateChildQueueName].decreaseTrackedResource(childQueuePath, applicationID, usage, removeApp) + removeQT, decreased := qt.childQueueTrackers[childName].decreaseTrackedResource(hierarchy[1:], applicationID, usage, removeApp) if !decreased { return false, decreased } if removeQT { log.Log(log.SchedUGM).Debug("Removed queue tracker linkage from its parent", - zap.String("queue path ", queuePath), - zap.String("removed queue name", immediateChildQueueName), - zap.String("parent queue", qt.queueName)) - delete(qt.childQueueTrackers, immediateChildQueueName) + zap.String("queue path ", qt.queuePath), + zap.String("removed queue name", childName), + zap.String("parent queue name", qt.queueName)) + delete(qt.childQueueTrackers, childName) } } - qt.resourceUsage.SubFrom(usage) if removeApp { log.Log(log.SchedUGM).Debug("Removed application from running applications", zap.String("application", applicationID), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.String("queue name", qt.queueName)) delete(qt.runningApplications, applicationID) } log.Log(log.SchedUGM).Debug("Successfully decreased resource usage", - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.String("application", applicationID), zap.Stringer("resource", usage), zap.Stringer("total resource after decreasing", qt.resourceUsage), @@ -209,40 +212,29 @@ func (qt *QueueTracker) decreaseTrackedResource(queuePath string, applicationID removeQT := len(qt.childQueueTrackers) == 0 && len(qt.runningApplications) == 0 && resources.IsZero(qt.resourceUsage) && qt.maxRunningApps == 0 && resources.Equals(resources.NewResource(), qt.maxResources) log.Log(log.SchedUGM).Debug("Remove queue tracker", - zap.String("queue path ", queuePath), + zap.String("queue path ", qt.queuePath), zap.Bool("remove QT", removeQT)) return removeQT, true } -func (qt *QueueTracker) getChildQueueTracker(queuePath string) *QueueTracker { - var childQueuePath, immediateChildQueueName string - childQueuePath, immediateChildQueueName = getChildQueuePath(queuePath) - childQueueTracker := qt - if childQueuePath != common.Empty { - for childQueuePath != common.Empty { - if childQueueTracker != nil { - if len(childQueueTracker.childQueueTrackers) == 0 || childQueueTracker.childQueueTrackers[immediateChildQueueName] == nil { - newChildQt := newQueueTracker(qt.queuePath, immediateChildQueueName) - childQueueTracker.childQueueTrackers[immediateChildQueueName] = newChildQt - childQueueTracker = newChildQt - } else { - childQueueTracker = childQueueTracker.childQueueTrackers[immediateChildQueueName] - } - } - childQueuePath, immediateChildQueueName = getChildQueuePath(childQueuePath) - } - } - return childQueueTracker -} - -func (qt *QueueTracker) setLimit(queuePath string, maxResource *resources.Resource, maxApps uint64) { +func (qt *QueueTracker) setLimit(hierarchy []string, maxResource *resources.Resource, maxApps uint64) { log.Log(log.SchedUGM).Debug("Setting limits", - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), + zap.Strings("hierarchy", hierarchy), zap.Uint64("max applications", maxApps), zap.Stringer("max resources", maxResource)) - childQueueTracker := qt.getChildQueueTracker(queuePath) - childQueueTracker.maxRunningApps = maxApps - childQueueTracker.maxResources = maxResource + // depth first: all the way to the leaf, create if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] == nil { + qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName) + } + qt.childQueueTrackers[childName].setLimit(hierarchy[1:], maxResource, maxApps) + } else if len(hierarchy) == 1 { + qt.maxRunningApps = maxApps + qt.maxResources = maxResource + } } func (qt *QueueTracker) headroom(hierarchy []string) *resources.Resource { @@ -299,14 +291,18 @@ func (qt *QueueTracker) getResourceUsageDAOInfo(parentQueuePath string) *dao.Res // IsQueuePathTrackedCompletely Traverse queue path upto the end queue through its linkage // to confirm entire queuePath has been tracked completely or not -func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool { - if queuePath == configs.RootQueue || queuePath == qt.queueName { - return true - } - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if immediateChildQueueName != common.Empty { - if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok { - return childUt.IsQueuePathTrackedCompletely(childQueuePath) +func (qt *QueueTracker) IsQueuePathTrackedCompletely(hierarchy []string) bool { + // depth first: all the way to the leaf, ignore if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] != nil { + return qt.childQueueTrackers[childName].IsQueuePathTrackedCompletely(hierarchy[1:]) + } + } else if len(hierarchy) == 1 { + // reach end of hierarchy + if hierarchy[0] == configs.RootQueue || hierarchy[0] == qt.queueName { + return true } } return false @@ -316,19 +312,23 @@ func (qt *QueueTracker) IsQueuePathTrackedCompletely(queuePath string) bool { // linkage needs to be removed or not based on the running applications. // If there are any running applications in end leaf queue, we should remove the linkage between // the leaf and its parent queue using UnlinkQT method. Otherwise, we should leave as it is. -func (qt *QueueTracker) IsUnlinkRequired(queuePath string) bool { - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if immediateChildQueueName != common.Empty { - if childUt, ok := qt.childQueueTrackers[immediateChildQueueName]; ok { - return childUt.IsUnlinkRequired(childQueuePath) +func (qt *QueueTracker) IsUnlinkRequired(hierarchy []string) bool { + // depth first: all the way to the leaf, ignore if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] != nil { + return qt.childQueueTrackers[childName].IsUnlinkRequired(hierarchy[1:]) } - } - if queuePath == configs.RootQueue || queuePath == qt.queueName { - if len(qt.runningApplications) == 0 { - log.Log(log.SchedUGM).Debug("Is Unlink Required?", - zap.String("queue path", queuePath), - zap.Int("no. of applications", len(qt.runningApplications))) - return true + } else if len(hierarchy) == 1 { + // reach end of hierarchy + if hierarchy[0] == configs.RootQueue || hierarchy[0] == qt.queueName { + if len(qt.runningApplications) == 0 { + log.Log(log.SchedUGM).Debug("Is Unlink Required?", + zap.String("queue path", qt.queuePath), + zap.Int("no. of applications", len(qt.runningApplications))) + return true + } } } return false @@ -336,84 +336,90 @@ func (qt *QueueTracker) IsUnlinkRequired(queuePath string) bool { // UnlinkQT Traverse queue path upto the end queue. If end queue has any more child queue trackers, // then goes upto each child queue and removes the linkage with its immediate parent -func (qt *QueueTracker) UnlinkQT(queuePath string) bool { +func (qt *QueueTracker) UnlinkQT(hierarchy []string) bool { log.Log(log.SchedUGM).Debug("Unlinking current queue tracker from its parent", zap.String("current queue ", qt.queueName), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), + zap.Strings("hierarchy", hierarchy), zap.Int("no. of child queue trackers", len(qt.childQueueTrackers))) - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - - if childQueuePath == common.Empty && len(qt.childQueueTrackers) > 0 { - for qName := range qt.childQueueTrackers { - qt.UnlinkQT(qt.queueName + configs.DOT + qName) + // depth first: all the way to the leaf, ignore if not exists + // more than 1 in the slice means we need to recurse down + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] != nil { + if qt.childQueueTrackers[childName].UnlinkQT(hierarchy[1:]) { + delete(qt.childQueueTrackers, childName) + } } - } - - if childQueuePath != common.Empty { - if qt.childQueueTrackers[immediateChildQueueName] != nil { - unlink := qt.childQueueTrackers[immediateChildQueueName].UnlinkQT(childQueuePath) - if unlink { - delete(qt.childQueueTrackers, immediateChildQueueName) + } else if len(hierarchy) <= 1 { + // reach end of hierarchy, unlink all queues under this queue + for childName, childQT := range qt.childQueueTrackers { + if childQT.UnlinkQT([]string{childName}) { + delete(qt.childQueueTrackers, childName) } } } + if len(qt.runningApplications) == 0 && len(qt.childQueueTrackers) == 0 { return true } return false } -// decreaseTrackedResourceUsageDownwards queuePath either could be parent or leaf queue path. If it is parent queue path, then traverse upto the end leaf -// recursively for all child queues, reset resourceUsage and runningApplications to the default value. -// Once downward traversal has been completed, traverse downwards using decreaseTrackedResourceUsageUpwards -func (qt *QueueTracker) decreaseTrackedResourceUsageDownwards(queuePath string) map[string]bool { - childQueueTracker := qt.getChildQueueTracker(queuePath) - childQueueTrackers := childQueueTracker.childQueueTrackers +// decreaseTrackedResourceUsageDownwards queuePath either could be parent or leaf queue path. +// If it is parent queue path, then reset resourceUsage and runningApplications for all child queues, +// If it is leaf queue path, reset resourceUsage and runningApplications for queue trackers in this queue path. +func (qt *QueueTracker) decreaseTrackedResourceUsageDownwards(hierarchy []string) map[string]bool { + // depth first: all the way to the leaf, ignore if not exists + // more than 1 in the slice means we need to recurse down removedApplications := make(map[string]bool) - for _, childQT := range childQueueTrackers { - if len(childQT.runningApplications) > 0 && childQT.resourceUsage != resources.NewResource() { - removedApplications = childQT.runningApplications - childQT.resourceUsage = resources.NewResource() - childQT.runningApplications = make(map[string]bool) - childQT.decreaseTrackedResourceUsageDownwards(childQT.queuePath) + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] != nil { + removedApplications = qt.childQueueTrackers[childName].decreaseTrackedResourceUsageDownwards(hierarchy[1:]) } - } - qt.decreaseTrackedResourceUsageUpwards(queuePath) - return removedApplications -} - -// decreaseTrackedResourceUsageUpwards Traverse upwards all the way upto the root starting from last queue in queuePath, -// reset resourceUsage and runningApplications to the default value. -func (qt *QueueTracker) decreaseTrackedResourceUsageUpwards(queuePath string) { - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if childQueuePath != common.Empty { - if qt.childQueueTrackers[immediateChildQueueName] == nil { - log.Log(log.SchedUGM).Error("Child queueTracker tracker must be available in child queues map", - zap.String("child queueTracker name", immediateChildQueueName)) + } else if len(hierarchy) <= 1 { + // reach end of hierarchy, remove all resources under this queue + removedApplications = qt.runningApplications + for childName, childQT := range qt.childQueueTrackers { + if len(childQT.runningApplications) > 0 && childQT.resourceUsage != resources.NewResource() { + // runningApplications in parent queue should contain all the running applications in child queues, + // so we don't need to update removedApplications from child queue result. + childQT.decreaseTrackedResourceUsageDownwards([]string{childName}) + } } - qt.childQueueTrackers[immediateChildQueueName].decreaseTrackedResourceUsageUpwards(childQueuePath) } + if len(qt.runningApplications) > 0 && qt.resourceUsage != resources.NewResource() { qt.resourceUsage = resources.NewResource() qt.runningApplications = make(map[string]bool) } + + return removedApplications } -func (qt *QueueTracker) canRunApp(queuePath string, applicationID string, trackType trackingType) bool { +func (qt *QueueTracker) canRunApp(hierarchy []string, applicationID string, trackType trackingType) bool { log.Log(log.SchedUGM).Debug("Checking can run app", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), - zap.String("application", applicationID)) - childQueuePath, immediateChildQueueName := getChildQueuePath(queuePath) - if childQueuePath != common.Empty { - if qt.childQueueTrackers[immediateChildQueueName] != nil { - allowed := qt.childQueueTrackers[immediateChildQueueName].canRunApp(childQueuePath, applicationID, trackType) - if !allowed { - return false - } + zap.String("queue path", qt.queuePath), + zap.String("application", applicationID), + zap.Strings("hierarchy", hierarchy)) + // depth first: all the way to the leaf, create if not exists + // more than 1 in the slice means we need to recurse down + childCanRunApp := true + if len(hierarchy) > 1 { + childName := hierarchy[1] + if qt.childQueueTrackers[childName] == nil { + qt.childQueueTrackers[childName] = newQueueTracker(qt.queuePath, childName) } + childCanRunApp = qt.childQueueTrackers[childName].canRunApp(hierarchy[1:], applicationID, trackType) } + if !childCanRunApp { + return false + } + + // arrived at the leaf or on the way out: check against current max if set var running int if existingApp := qt.runningApplications[applicationID]; existingApp { return true @@ -425,7 +431,7 @@ func (qt *QueueTracker) canRunApp(queuePath string, applicationID string, trackT if qt.maxRunningApps != 0 && running > int(qt.maxRunningApps) { log.Log(log.SchedUGM).Warn("can't run app as allowing new application to run would exceed configured max applications limit of specific user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Int("current running applications", len(qt.runningApplications)), zap.Uint64("max running applications", qt.maxRunningApps)) return false @@ -442,7 +448,7 @@ func (qt *QueueTracker) canRunApp(queuePath string, applicationID string, trackT if config != nil && config.maxApplications != 0 && running > int(config.maxApplications) { log.Log(log.SchedUGM).Warn("can't run app as allowing new application to run would exceed configured max applications limit of wildcard user/group", zap.Int("tracking type", int(trackType)), - zap.String("queue path", queuePath), + zap.String("queue path", qt.queuePath), zap.Int("current running applications", len(qt.runningApplications)), zap.Uint64("max running applications", config.maxApplications)) return false diff --git a/pkg/scheduler/ugm/queue_tracker_test.go b/pkg/scheduler/ugm/queue_tracker_test.go index 55b79a68..38bd189e 100644 --- a/pkg/scheduler/ugm/queue_tracker_test.go +++ b/pkg/scheduler/ugm/queue_tracker_test.go @@ -40,7 +40,7 @@ func TestQTIncreaseTrackedResource(t *testing.T) { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1) } - result := queueTracker.increaseTrackedResource(queuePath1, TestApp1, user, usage1) + result := queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1) } @@ -49,7 +49,7 @@ func TestQTIncreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2) } - result = queueTracker.increaseTrackedResource(queuePath2, TestApp2, user, usage2) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, user, usage2) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage2) } @@ -58,7 +58,7 @@ func TestQTIncreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3) } - result = queueTracker.increaseTrackedResource(queuePath3, TestApp3, user, usage3) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath3, configs.DOT), TestApp3, user, usage3) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath3, TestApp3, usage3) } @@ -67,7 +67,7 @@ func TestQTIncreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3) } - result = queueTracker.increaseTrackedResource(queuePath4, TestApp4, user, usage4) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath4, configs.DOT), TestApp4, user, usage4) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath4, TestApp4, usage4) } @@ -93,7 +93,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage1) } - result := queueTracker.increaseTrackedResource(queuePath1, TestApp1, user, usage1) + result := queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1) } @@ -103,7 +103,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2) } - result = queueTracker.increaseTrackedResource(queuePath2, TestApp2, user, usage2) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, user, usage2) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage2) } @@ -120,13 +120,13 @@ func TestQTDecreaseTrackedResource(t *testing.T) { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3) } - removeQT, decreased := queueTracker.decreaseTrackedResource(queuePath1, TestApp1, usage3, false) + removeQT, decreased := queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, usage3, false) if !decreased { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage3, err) } assert.Equal(t, removeQT, false, "wrong remove queue tracker value") - removeQT, decreased = queueTracker.decreaseTrackedResource(queuePath2, TestApp2, usage3, false) + removeQT, decreased = queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, usage3, false) if !decreased { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage3, err) } @@ -143,7 +143,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage3) } - removeQT, decreased = queueTracker.decreaseTrackedResource(queuePath1, TestApp1, usage4, true) + removeQT, decreased = queueTracker.decreaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, usage4, true) if !decreased { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath1, TestApp1, usage1, err) } @@ -156,7 +156,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage5) } - removeQT, decreased = queueTracker.decreaseTrackedResource(queuePath2, TestApp2, usage5, true) + removeQT, decreased = queueTracker.decreaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, usage5, true) if !decreased { t.Fatalf("unable to decrease tracked resource: queuepath %s, app %s, res %v, error %t", queuePath2, TestApp2, usage2, err) } @@ -166,7 +166,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { assert.Equal(t, removeQT, true, "wrong remove queue tracker value") // Test parent queueTracker has not zero usage, but child queueTrackers has all deleted - result = queueTracker.increaseTrackedResource(queuePath1, TestApp1, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1) } @@ -176,7 +176,7 @@ func TestQTDecreaseTrackedResource(t *testing.T) { if err != nil { t.Errorf("new resource create returned error or wrong resource: error %t, res %v", err, usage2) } - result = queueTracker.increaseTrackedResource("root.parent", TestApp2, user, usage2) + result = queueTracker.increaseTrackedResource([]string{"root", "parent"}, TestApp2, user, usage2) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", "root.parent", TestApp2, usage2) } @@ -216,37 +216,37 @@ func TestQTQuotaEnforcement(t *testing.T) { child2QueueTracker.maxRunningApps = 2 parentQueueTracker.childQueueTrackers["child2"] = child2QueueTracker - result := queueTracker.increaseTrackedResource(queuePath1, TestApp1, user, usage1) + result := queueTracker.increaseTrackedResource(strings.Split(queuePath1, configs.DOT), TestApp1, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath1, TestApp1, usage1) } - result = queueTracker.increaseTrackedResource(queuePath2, TestApp2, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage1) } - result = queueTracker.increaseTrackedResource(queuePath2, TestApp2, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp2, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath2, TestApp2, usage1) } - result = queueTracker.increaseTrackedResource(queuePath2, TestApp3, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath2, configs.DOT), TestApp3, user, usage1) if result { t.Fatalf("Increasing resource usage should fail as child2's resource usage exceeded configured max resources limit. queuepath %s, app %s, res %v", queuePath2, TestApp3, usage1) } - result = queueTracker.increaseTrackedResource(queuePath3, TestApp3, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath3, configs.DOT), TestApp3, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath3, TestApp3, usage1) } - result = queueTracker.increaseTrackedResource(queuePath4, TestApp4, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath4, configs.DOT), TestApp4, user, usage1) if !result { t.Fatalf("unable to increase tracked resource: queuepath %s, app %s, res %v", queuePath4, TestApp4, usage1) } - result = queueTracker.increaseTrackedResource(queuePath4, TestApp4, user, usage1) + result = queueTracker.increaseTrackedResource(strings.Split(queuePath4, configs.DOT), TestApp4, user, usage1) if result { t.Fatalf("Increasing resource usage should fail as parent's resource usage exceeded configured max resources limit. queuepath %s, app %s, res %v", queuePath4, TestApp4, usage1) } @@ -259,9 +259,11 @@ func TestHeadroom(t *testing.T) { // nothing exists make sure the hierarchy gets created root := newRootQueueTracker() - parent := root.getChildQueueTracker("root.parent") + root.childQueueTrackers["parent"] = newQueueTracker("root", "parent") + parent := root.childQueueTrackers["parent"] assert.Assert(t, parent != nil, "parent queue tracker should have been created") - leaf := root.getChildQueueTracker(path) + parent.childQueueTrackers["leaf"] = newQueueTracker("root.parent", "leaf") + leaf := parent.childQueueTrackers["leaf"] assert.Assert(t, leaf != nil, "leaf queue tracker should have been created") // auto created trackers no max resource set diff --git a/pkg/scheduler/ugm/user_tracker.go b/pkg/scheduler/ugm/user_tracker.go index 6f761cbf..5639253d 100644 --- a/pkg/scheduler/ugm/user_tracker.go +++ b/pkg/scheduler/ugm/user_tracker.go @@ -57,7 +57,8 @@ func newUserTracker(user string) *UserTracker { func (ut *UserTracker) increaseTrackedResource(queuePath, applicationID string, usage *resources.Resource) bool { ut.Lock() defer ut.Unlock() - increased := ut.queueTracker.increaseTrackedResource(queuePath, applicationID, user, usage) + hierarchy := strings.Split(queuePath, configs.DOT) + increased := ut.queueTracker.increaseTrackedResource(hierarchy, applicationID, user, usage) if increased { gt := ut.appGroupTrackers[applicationID] log.Log(log.SchedUGM).Debug("Increasing resource usage for group", @@ -67,7 +68,7 @@ func (ut *UserTracker) increaseTrackedResource(queuePath, applicationID string, zap.Stringer("resource", usage)) increasedGroupUsage := gt.increaseTrackedResource(queuePath, applicationID, usage, ut.userName) if !increasedGroupUsage { - _, decreased := ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, false) + _, decreased := ut.queueTracker.decreaseTrackedResource(hierarchy, applicationID, usage, false) if !decreased { log.Log(log.SchedUGM).Error("User resource usage rollback has failed", zap.String("queue path", queuePath), @@ -86,7 +87,7 @@ func (ut *UserTracker) decreaseTrackedResource(queuePath, applicationID string, if removeApp { delete(ut.appGroupTrackers, applicationID) } - return ut.queueTracker.decreaseTrackedResource(queuePath, applicationID, usage, removeApp) + return ut.queueTracker.decreaseTrackedResource(strings.Split(queuePath, configs.DOT), applicationID, usage, removeApp) } func (ut *UserTracker) hasGroupForApp(applicationID string) bool { @@ -120,7 +121,7 @@ func (ut *UserTracker) getTrackedApplications() map[string]*GroupTracker { func (ut *UserTracker) setLimits(queuePath string, resource *resources.Resource, maxApps uint64) { ut.Lock() defer ut.Unlock() - ut.queueTracker.setLimit(queuePath, resource, maxApps) + ut.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), resource, maxApps) } func (ut *UserTracker) headroom(queuePath string) *resources.Resource { @@ -148,19 +149,19 @@ func (ut *UserTracker) GetUserResourceUsageDAOInfo() *dao.UserResourceUsageDAOIn func (ut *UserTracker) IsQueuePathTrackedCompletely(queuePath string) bool { ut.RLock() defer ut.RUnlock() - return ut.queueTracker.IsQueuePathTrackedCompletely(queuePath) + return ut.queueTracker.IsQueuePathTrackedCompletely(strings.Split(queuePath, configs.DOT)) } func (ut *UserTracker) IsUnlinkRequired(queuePath string) bool { ut.RLock() defer ut.RUnlock() - return ut.queueTracker.IsUnlinkRequired(queuePath) + return ut.queueTracker.IsUnlinkRequired(strings.Split(queuePath, configs.DOT)) } func (ut *UserTracker) UnlinkQT(queuePath string) bool { ut.RLock() defer ut.RUnlock() - return ut.queueTracker.UnlinkQT(queuePath) + return ut.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT)) } // canBeRemoved Does "root" queue has any child queue trackers? Is there any running applications in "root" qt? @@ -173,5 +174,5 @@ func (ut *UserTracker) canBeRemoved() bool { func (ut *UserTracker) canRunApp(queuePath, applicationID string) bool { ut.Lock() defer ut.Unlock() - return ut.queueTracker.canRunApp(queuePath, applicationID, user) + return ut.queueTracker.canRunApp(strings.Split(queuePath, configs.DOT), applicationID, user) } diff --git a/pkg/scheduler/ugm/utilities.go b/pkg/scheduler/ugm/utilities.go index d2df718f..440a315f 100644 --- a/pkg/scheduler/ugm/utilities.go +++ b/pkg/scheduler/ugm/utilities.go @@ -25,19 +25,6 @@ import ( "github.com/apache/yunikorn-core/pkg/common/configs" ) -func getChildQueuePath(queuePath string) (string, string) { - idx := strings.Index(queuePath, configs.DOT) - if idx == -1 { - return "", "" - } - childQueuePath := queuePath[idx+1:] - idx = strings.Index(childQueuePath, configs.DOT) - if idx == -1 { - return childQueuePath, childQueuePath - } - return childQueuePath, childQueuePath[:idx] -} - // getParentPath return the path of the parent queue and an empty string if this queue is // the root queue. func getParentPath(queuePath string) string { diff --git a/pkg/scheduler/ugm/utilities_test.go b/pkg/scheduler/ugm/utilities_test.go index 291f1012..09481e1e 100644 --- a/pkg/scheduler/ugm/utilities_test.go +++ b/pkg/scheduler/ugm/utilities_test.go @@ -37,20 +37,6 @@ func internalGetResource(usage *dao.ResourceUsageDAOInfo, resources map[string]* return resources } -func TestGetChildQueuePath(t *testing.T) { - childPath, immediateChildName := getChildQueuePath("root.parent.leaf") - assert.Equal(t, childPath, "parent.leaf") - assert.Equal(t, immediateChildName, "parent") - - childPath, immediateChildName = getChildQueuePath("parent.leaf") - assert.Equal(t, childPath, "leaf") - assert.Equal(t, immediateChildName, "leaf") - - childPath, immediateChildName = getChildQueuePath("leaf") - assert.Equal(t, childPath, "") - assert.Equal(t, immediateChildName, "") -} - func TestGetParentQueuePath(t *testing.T) { assert.Equal(t, getParentPath(""), "") assert.Equal(t, getParentPath("root"), "") --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org