This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new dee60d15 [YUNIKORN-2012] group quota is not maintained (#668)
dee60d15 is described below

commit dee60d15ee776d3c9890cacc5538ffa3cbf236fa
Author: Manikandan R <maniraj...@gmail.com>
AuthorDate: Fri Nov 3 12:11:05 2023 +1100

    [YUNIKORN-2012] group quota is not maintained (#668)
    
    In certain cases it looks like group quota is not considered during
    scheduling. This is caused by incorrectly maintaining the quotas in the
    user group manager. The quota was reset while it should not causing the
    quota to never be checked.
    
    Further test code added in YUNIKORN-2024i via  #669
    
    Closes: #668
    
    Signed-off-by: Wilfred Spiegelenburg <wilfr...@apache.org>
---
 pkg/scheduler/partition_test.go    |   1 +
 pkg/scheduler/ugm/group_tracker.go |   3 +-
 pkg/scheduler/ugm/manager.go       | 254 +++++++++++++++++++++++--------------
 pkg/scheduler/ugm/manager_test.go  | 119 ++++++++++++-----
 pkg/scheduler/ugm/queue_tracker.go |  33 ++++-
 pkg/scheduler/ugm/user_tracker.go  |   3 +-
 6 files changed, 275 insertions(+), 138 deletions(-)

diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 186fa315..aa867f0f 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -45,6 +45,7 @@ func setupUGM() {
        userManager := ugm.GetUserManager()
        userManager.ClearUserTrackers()
        userManager.ClearGroupTrackers()
+       userManager.ClearConfigLimits()
 }
 
 func setupNode(t *testing.T, nodeID string, partition *PartitionContext, 
nodeRes *resources.Resource) *objects.Node {
diff --git a/pkg/scheduler/ugm/group_tracker.go 
b/pkg/scheduler/ugm/group_tracker.go
index f6f316f0..62304216 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -118,11 +118,10 @@ func (gt *GroupTracker) UnlinkQT(queuePath string) bool {
        return gt.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT))
 }
 
-// canBeRemoved Does "root" queue has any child queue trackers? Is there any 
running applications in "root" qt?
 func (gt *GroupTracker) canBeRemoved() bool {
        gt.RLock()
        defer gt.RUnlock()
-       return len(gt.queueTracker.childQueueTrackers) == 0 && 
len(gt.queueTracker.runningApplications) == 0
+       return gt.queueTracker.canBeRemoved()
 }
 
 func (gt *GroupTracker) getName() string {
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index fd40aafe..0f431bc7 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -39,9 +39,11 @@ var m *Manager
 type Manager struct {
        userTrackers              map[string]*UserTracker
        groupTrackers             map[string]*GroupTracker
-       userWildCardLimitsConfig  map[string]*LimitConfig // Hold limits 
settings of user '*'
-       groupWildCardLimitsConfig map[string]*LimitConfig // Hold limits 
settings of group '*'
-       configuredGroups          map[string][]string     // Hold groups for 
all configured queue paths.
+       userWildCardLimitsConfig  map[string]*LimitConfig            // Hold 
limits settings of user '*'
+       groupWildCardLimitsConfig map[string]*LimitConfig            // Hold 
limits settings of group '*'
+       configuredGroups          map[string][]string                // Hold 
groups for all configured queue paths.
+       userLimits                map[string]map[string]*LimitConfig // Holds 
queue path * user limit config
+       groupLimits               map[string]map[string]*LimitConfig // Holds 
queue path * group limit config
        sync.RWMutex
 }
 
@@ -312,19 +314,29 @@ func (m *Manager) isGroupRemovable(gt *GroupTracker) bool 
{
 }
 
 func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string) 
error {
-       m.Lock()
-       defer m.Unlock()
+       userWildCardLimitsConfig := make(map[string]*LimitConfig)
+       groupWildCardLimitsConfig := make(map[string]*LimitConfig)
+       configuredGroups := make(map[string][]string)
 
-       m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
-       m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
-       m.configuredGroups = make(map[string][]string)
-       return m.internalProcessConfig(config, queuePath)
+       userLimits := make(map[string]map[string]*LimitConfig)  // Holds queue 
path * user limit config
+       groupLimits := make(map[string]map[string]*LimitConfig) // Holds queue 
path * group limit config
+
+       // as and when parse new configs, store them in temporary maps
+       if err := m.internalProcessConfig(config, queuePath, userLimits, 
groupLimits, userWildCardLimitsConfig, groupWildCardLimitsConfig, 
configuredGroups); err != nil {
+               return err
+       }
+
+       // compare existing config with new configs stored in above temporary 
maps
+       m.clearEarlierSetLimits(userLimits, groupLimits)
+
+       // switch over - replace the existing config with new configs
+       m.replaceLimitConfigs(userLimits, groupLimits, 
userWildCardLimitsConfig, groupWildCardLimitsConfig, configuredGroups)
+
+       return nil
 }
 
-func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath 
string) error {
-       // Holds user and group for which limits have been configured with 
specific queue path
-       userLimits := make(map[string]bool)
-       groupLimits := make(map[string]bool)
+func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath 
string, newUserLimits map[string]map[string]*LimitConfig, newGroupLimits 
map[string]map[string]*LimitConfig,
+       newUserWildCardLimitsConfig map[string]*LimitConfig, 
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups 
map[string][]string) error {
        // Traverse limits of specific queue path
        for _, limit := range cur.Limits {
                var maxResource *resources.Resource
@@ -348,12 +360,16 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                                zap.Uint64("max application", 
limit.MaxApplications),
                                zap.Any("max resources", limit.MaxResources))
                        if user == common.Wildcard {
-                               m.userWildCardLimitsConfig[queuePath] = 
limitConfig
+                               newUserWildCardLimitsConfig[queuePath] = 
limitConfig
                                continue
                        }
-                       if err := m.processUserConfig(user, limitConfig, 
queuePath, userLimits); err != nil {
+                       if err := m.setUserLimits(user, limitConfig, 
queuePath); err != nil {
                                return err
                        }
+                       if _, ok := newUserLimits[queuePath]; !ok {
+                               newUserLimits[queuePath] = 
make(map[string]*LimitConfig)
+                       }
+                       newUserLimits[queuePath][user] = limitConfig
                }
                for _, group := range limit.Groups {
                        if group == common.Empty {
@@ -365,24 +381,24 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
                                zap.String("queue path", queuePath),
                                zap.Uint64("max application", 
limit.MaxApplications),
                                zap.Any("max resources", limit.MaxResources))
-                       if err := m.processGroupConfig(group, limitConfig, 
queuePath, groupLimits); err != nil {
+                       if err := m.setGroupLimits(group, limitConfig, 
queuePath); err != nil {
                                return err
                        }
+                       if _, ok := newGroupLimits[queuePath]; !ok {
+                               newGroupLimits[queuePath] = 
make(map[string]*LimitConfig)
+                       }
+                       newGroupLimits[queuePath][group] = limitConfig
                        if group == common.Wildcard {
-                               m.groupWildCardLimitsConfig[queuePath] = 
limitConfig
+                               newGroupWildCardLimitsConfig[queuePath] = 
limitConfig
                        } else {
-                               m.configuredGroups[queuePath] = 
append(m.configuredGroups[queuePath], group)
+                               newConfiguredGroups[queuePath] = 
append(newConfiguredGroups[queuePath], group)
                        }
                }
        }
-       if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); 
err != nil {
-               return err
-       }
-
        if len(cur.Queues) > 0 {
                for _, child := range cur.Queues {
                        childQueuePath := queuePath + configs.DOT + child.Name
-                       if err := m.internalProcessConfig(child, 
childQueuePath); err != nil {
+                       if err := m.internalProcessConfig(child, 
childQueuePath, newUserLimits, newGroupLimits, newUserWildCardLimitsConfig, 
newGroupWildCardLimitsConfig, newConfiguredGroups); err != nil {
                                return err
                        }
                }
@@ -390,102 +406,132 @@ func (m *Manager) internalProcessConfig(cur 
configs.QueueConfig, queuePath strin
        return nil
 }
 
-func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig, 
queuePath string, userLimits map[string]bool) error {
-       if err := m.setUserLimits(user, limitConfig, queuePath); err != nil {
-               return err
-       }
-       userLimits[user] = true
-       return nil
-}
+// clearEarlierSetLimits Clear already configured limits of users and groups 
for which limits have been configured before but not now
+func (m *Manager) clearEarlierSetLimits(newUserLimits 
map[string]map[string]*LimitConfig, newGroupLimits 
map[string]map[string]*LimitConfig) {
+       // Clear already configured limits of group for which limits have been 
configured before but not now
+       m.clearEarlierSetGroupLimits(newGroupLimits)
 
-func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, 
queuePath string, groupLimits map[string]bool) error {
-       if err := m.setGroupLimits(group, limitConfig, queuePath); err != nil {
-               return err
-       }
-       groupLimits[group] = true
-       return nil
+       // Clear already configured limits of user for which limits have been 
configured before but not now
+       m.clearEarlierSetUserLimits(newUserLimits)
 }
 
-// clearEarlierSetLimits Clear already configured limits of users and groups 
for which limits have been configured before but not now
-func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool, 
groupLimits map[string]bool, queuePath string) error {
-       // Clear already configured limits of group for which limits have been 
configured before but not now
-       for _, gt := range m.groupTrackers {
-               appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath, 
groupLimits)
-               if len(appUsersMap) > 0 {
-                       for app, user := range appUsersMap {
-                               ut := m.userTrackers[user]
-                               ut.setGroupForApp(app, nil)
+// clearEarlierSetUserLimits Traverse new user config and decide whether 
earlier usage needs to be cleared or not
+// by comparing with the existing config. Reset earlier usage only config set 
earlier but not now
+func (m *Manager) clearEarlierSetUserLimits(newUserLimits 
map[string]map[string]*LimitConfig) {
+       m.RLock()
+       defer m.RUnlock()
+       for queuePath, limitConfig := range m.userLimits {
+               // Is queue path exists?
+               if newUserLimit, ok := newUserLimits[queuePath]; !ok {
+                       for u := range limitConfig {
+                               if ut, utExists := m.userTrackers[u]; utExists {
+                                       m.resetUserEarlierUsage(ut, queuePath)
+                               }
+                       }
+               } else {
+                       // Queue path exists. Is user exists?
+                       for u := range limitConfig {
+                               if _, ulExists := newUserLimit[u]; !ulExists {
+                                       if ut, utExists := m.userTrackers[u]; 
utExists {
+                                               m.resetUserEarlierUsage(ut, 
queuePath)
+                                       }
+                               }
                        }
                }
        }
-
-       // Clear already configured limits of user for which limits have been 
configured before but not now
-       for _, ut := range m.userTrackers {
-               m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
-       }
-       return nil
 }
 
-func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, queuePath string, 
userLimits map[string]bool) {
+// resetUserEarlierUsage Clear or reset earlier usage only when user already 
tracked for the queue path.
+// Reset the max apps and max resources to default, unlink the end leaf queue 
of queue path from its immediate parent and
+// eventually remove user tracker object itself from ugm if it can be removed.
+func (m *Manager) resetUserEarlierUsage(ut *UserTracker, queuePath string) {
        // Is this user already tracked for the queue path?
        if ut.IsQueuePathTrackedCompletely(queuePath) {
-               u := ut.userName
-               // Is there any limit config set for user in the current 
configuration? If not, then clear those old limit settings
-               if _, ok := userLimits[u]; !ok {
-                       log.Log(log.SchedUGM).Debug("Need to clear earlier set 
configs for user",
-                               zap.String("user", u),
-                               zap.String("queue path", queuePath))
-                       // Is there any running applications in end queue of 
this queue path? If not, then remove the linkage between end queue and its 
immediate parent
-                       if ut.IsUnlinkRequired(queuePath) {
-                               ut.UnlinkQT(queuePath)
-                       } else {
-                               ut.setLimits(queuePath, 
resources.NewResource(), 0)
-                               log.Log(log.SchedUGM).Debug("Cleared earlier 
set limit configs for user",
-                                       zap.String("user", u),
-                                       zap.String("queue path", queuePath))
+               log.Log(log.SchedUGM).Debug("Need to clear earlier set configs 
for user",
+                       zap.String("user", ut.userName),
+                       zap.String("queue path", queuePath))
+               ut.setLimits(queuePath, resources.NewResource(), 0)
+               // Is there any running applications in end queue of this queue 
path? If not, then remove the linkage between end queue and its immediate parent
+               if ut.IsUnlinkRequired(queuePath) {
+                       ut.UnlinkQT(queuePath)
+               }
+               log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs 
for user",
+                       zap.String("user", ut.userName),
+                       zap.String("queue path", queuePath))
+               if ut.canBeRemoved() {
+                       delete(m.userTrackers, ut.userName)
+               }
+       }
+}
+
+// clearEarlierSetGroupLimits Traverse new group config and decide whether 
earlier usage needs to be cleared or not
+// by comparing with the existing config. Reset earlier usage only config set 
earlier but not now
+func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits 
map[string]map[string]*LimitConfig) {
+       m.RLock()
+       defer m.RUnlock()
+       for queuePath, limitConfig := range m.groupLimits {
+               // Is queue path exists?
+               if newGroupLimit, ok := newGroupLimits[queuePath]; !ok {
+                       for g := range limitConfig {
+                               if gt, gtExists := m.groupTrackers[g]; gtExists 
{
+                                       m.resetGroupEarlierUsage(gt, queuePath)
+                               }
                        }
-                       // Does "root" queue has any child queue trackers? At 
some point during this whole traversal, root might
-                       // not have any child queue trackers. When the 
situation comes, remove the linkage between the user and
-                       // its root queue tracker
-                       if ut.canBeRemoved() {
-                               delete(m.userTrackers, ut.userName)
+               } else {
+                       // Queue path exists. Is group exists?
+                       for g := range limitConfig {
+                               if _, glExists := newGroupLimit[g]; !glExists {
+                                       if gt, gtExists := m.groupTrackers[g]; 
gtExists {
+                                               m.resetGroupEarlierUsage(gt, 
queuePath)
+                                       }
+                               }
                        }
                }
        }
 }
 
-func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath 
string, groupLimits map[string]bool) map[string]string {
-       appUsersMap := make(map[string]string)
-       // Is this group already tracked for the queue path?
+// resetGroupEarlierUsage Clear or reset earlier usage only when group already 
tracked for the queue path.
+// Decrease the group usage and collect the list of applications for which 
user app group linkage needs to be broken.
+// Reset the max apps and max resources to default, unlink the end leaf queue 
of queue path from its immediate parent and
+// eventually remove group tracker object itself from ugm if it can be removed.
+func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
        if gt.IsQueuePathTrackedCompletely(queuePath) {
-               g := gt.groupName
-               // Is there any limit config set for group in the current 
configuration? If not, then clear those old limit settings
-               if ok := groupLimits[g]; !ok {
-                       log.Log(log.SchedUGM).Debug("Need to clear earlier set 
configs for group",
-                               zap.String("group", g),
-                               zap.String("queue path", queuePath))
-                       appUsersMap = 
gt.decreaseAllTrackedResourceUsage(queuePath)
-                       // Is there any running applications in end queue of 
this queue path? If not, then remove the linkage between end queue and its 
immediate parent
-                       if gt.IsUnlinkRequired(queuePath) {
-                               gt.UnlinkQT(queuePath)
-                       } else {
-                               gt.setLimits(queuePath, 
resources.NewResource(), 0)
-                               log.Log(log.SchedUGM).Debug("Cleared earlier 
set limit configs for group",
-                                       zap.String("group", g),
-                                       zap.String("queue path", queuePath))
-                       }
-                       // Does "root" queue has any child queue trackers? At 
some point during this whole traversal, root might
-                       // not have any child queue trackers. When the 
situation comes, remove the linkage between the group and
-                       // its root queue tracker
-                       if gt.canBeRemoved() {
-                               delete(m.groupTrackers, gt.groupName)
-                       }
+               log.Log(log.SchedUGM).Debug("Need to clear earlier set configs 
for group",
+                       zap.String("group", gt.groupName),
+                       zap.String("queue path", queuePath))
+               appUsersMap := gt.decreaseAllTrackedResourceUsage(queuePath)
+               for app, u := range appUsersMap {
+                       ut := m.userTrackers[u]
+                       delete(ut.appGroupTrackers, app)
+               }
+               gt.setLimits(queuePath, resources.NewResource(), 0)
+               // Is there any running applications in end queue of this queue 
path? If not, then remove the linkage between end queue and its immediate parent
+               if gt.IsUnlinkRequired(queuePath) {
+                       gt.UnlinkQT(queuePath)
+               }
+               log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs 
for group",
+                       zap.String("group", gt.groupName),
+                       zap.String("queue path", queuePath))
+               if gt.canBeRemoved() {
+                       delete(m.groupTrackers, gt.groupName)
                }
        }
-       return appUsersMap
+}
+
+func (m *Manager) replaceLimitConfigs(newUserLimits 
map[string]map[string]*LimitConfig, newGroupLimits 
map[string]map[string]*LimitConfig,
+       newUserWildCardLimitsConfig map[string]*LimitConfig, 
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups 
map[string][]string) {
+       m.Lock()
+       defer m.Unlock()
+       m.userLimits = newUserLimits
+       m.groupLimits = newGroupLimits
+       m.userWildCardLimitsConfig = newUserWildCardLimitsConfig
+       m.groupWildCardLimitsConfig = newGroupWildCardLimitsConfig
+       m.configuredGroups = newConfiguredGroups
 }
 
 func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, 
queuePath string) error {
+       m.Lock()
+       defer m.Unlock()
        log.Log(log.SchedUGM).Debug("Setting user limits",
                zap.String("user", user),
                zap.String("queue path", queuePath),
@@ -504,6 +550,8 @@ func (m *Manager) setUserLimits(user string, limitConfig 
*LimitConfig, queuePath
 }
 
 func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig, 
queuePath string) error {
+       m.Lock()
+       defer m.Unlock()
        log.Log(log.SchedUGM).Debug("Setting group limits",
                zap.String("group", group),
                zap.String("queue path", queuePath),
@@ -620,8 +668,20 @@ func (m *Manager) ClearUserTrackers() {
        m.userTrackers = make(map[string]*UserTracker)
 }
 
+// ClearGroupTrackers only for tests
 func (m *Manager) ClearGroupTrackers() {
        m.Lock()
        defer m.Unlock()
        m.groupTrackers = make(map[string]*GroupTracker)
 }
+
+// ClearConfigLimits only for tests
+func (m *Manager) ClearConfigLimits() {
+       m.Lock()
+       defer m.Unlock()
+       m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
+       m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
+       m.configuredGroups = make(map[string][]string)
+       m.userLimits = make(map[string]map[string]*LimitConfig)
+       m.groupLimits = make(map[string]map[string]*LimitConfig)
+}
diff --git a/pkg/scheduler/ugm/manager_test.go 
b/pkg/scheduler/ugm/manager_test.go
index 4a0c8365..7ce5e72b 100644
--- a/pkg/scheduler/ugm/manager_test.go
+++ b/pkg/scheduler/ugm/manager_test.go
@@ -59,6 +59,7 @@ func TestUserManagerOnceInitialization(t *testing.T) {
 }
 
 func TestGetGroup(t *testing.T) {
+       setupUGM()
        user := security.UserGroup{User: "test", Groups: []string{"test", 
"test1"}}
        manager := GetUserManager()
 
@@ -368,15 +369,11 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t 
*testing.T) {
 
        // should run as user 'user' setting is map[memory:60 vcores:60] and 
total usage of "root.parent" is map[memory:50 vcores:50]
        increased := manager.IncreaseTrackedResource(queuePath1, TestApp2, 
usage, user)
-       if !increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user)
-       }
+       assert.Equal(t, increased, true)
 
        // should not run as user 'user' setting is map[memory:60 vcores:60] 
and total usage of "root.parent" is map[memory:60 vcores:60]
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp3, 
usage, user)
-       if increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user)
-       }
+       assert.Equal(t, increased, false)
 
        // configure max resource for root.parent to allow one more application 
to run through wild card user settings (not through specific user)
        // configure limits for user2 only. However, user1 should not be 
cleared as it has running applications
@@ -390,29 +387,22 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t 
*testing.T) {
        // user1 still should be able to run app as wild card user '*' setting 
is map[memory:70 vcores:70] for "root.parent" and
        // total usage of "root.parent" is map[memory:60 vcores:60]
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp2, 
usage, user)
-       if !increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp2, user)
-       }
+       assert.Equal(t, increased, true)
 
        // user1 should not be able to run app as wild card user '*' setting is 
map[memory:70 vcores:70] for "root.parent"
        // and total usage of "root.parent" is map[memory:70 vcores:70]
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp3, 
usage, user)
-       if increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp3, user)
-       }
+       assert.Equal(t, increased, false)
 
        // configure max resource for group1 * root.parent (map[memory:70 
vcores:70]) higher than wild card group * root.parent settings (map[memory:10 
vcores:10])
        // ensure group's specific settings has been used for enforcement 
checks as specific limits always has higher precedence when compared to wild 
card group limit settings
+       // group1 quota (map[memory:70 vcores:70]) has been removed from 
root.parent.leaf.
+       // so resource usage has been decreased for the same group and that too 
for entire queue hierarchy (root->parent->leaf)
+       // since group1 quota has been configured for root.parent, resource 
usage would be increased from
+       // the place where it has been left.
        conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, 
user1.Groups[0], "*", "*", "10", "10")
        assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
 
-       // since group1 quota (map[memory:70 vcores:70]) has reached before, 
further group increase is not allowed.
-       // Hence, user's tracked usage would be reverted
-       increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, 
usage, user1)
-       if increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user1)
-       }
-
        // configure max resource for user2 * root.parent (map[memory:70 
vcores:70]) higher than wild card user * root.parent settings (map[memory:10 
vcores:10])
        // ensure user's specific settings has been used for enforcement checks 
as specific limits always has higher precedence when compared to wild card user 
limit settings
        conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, "", 
"*", "", "10", "10")
@@ -421,16 +411,12 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t 
*testing.T) {
        // can be allowed to run upto resource usage map[memory:70 vcores:70]
        for i := 1; i <= 7; i++ {
                increased = manager.IncreaseTrackedResource(queuePath1, 
TestApp1, usage, user1)
-               if !increased {
-                       t.Fatalf("unable to increase tracked resource: 
queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
-               }
+               assert.Equal(t, increased, true)
        }
 
        // user2 should not be able to run app as user2 max limit is 
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, 
usage, user1)
-       if increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user1)
-       }
+       assert.Equal(t, increased, false)
 
        user3 := security.UserGroup{User: "user3", Groups: []string{"group3"}}
        conf = createUpdateConfigWithWildCardUsersAndGroups(user1.User, 
user1.Groups[0], "", "*", "10", "10")
@@ -438,9 +424,7 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t 
*testing.T) {
 
        // user3 should be able to run app as group3 uses wild card group limit 
settings map[memory:10 vcores:10]
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, 
usage, user3)
-       if !increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user)
-       }
+       assert.Equal(t, increased, true)
 
        // user4 (though belongs to different group, group4) should not be able 
to run app as group4 also
        // uses wild card group limit settings map[memory:10 vcores:10]
@@ -453,16 +437,12 @@ func TestUpdateConfigWithWildCardUsersAndGroups(t 
*testing.T) {
 
        // Since app is TestApp1, gt of "*" would be used as it is already 
mapped. group4 won't be used
        increased = manager.IncreaseTrackedResource(queuePath1, TestApp1, 
usage, user4)
-       if increased {
-               t.Fatalf("unable to increase tracked resource: queuepath %s, 
app %s, res %v", queuePath1, TestApp1, user1)
-       }
+       assert.Equal(t, increased, false)
 
        // Now group4 would be used as user4 is running TestApp2 for the first 
time. So can be allowed to run upto resource usage map[memory:70 vcores:70]
        for i := 1; i <= 7; i++ {
                increased = manager.IncreaseTrackedResource(queuePath1, 
TestApp2, usage, user4)
-               if !increased {
-                       t.Fatalf("unable to increase tracked resource: 
queuepath %s, app %s, res %v", queuePath1, TestApp1, user1)
-               }
+               assert.Equal(t, increased, true)
        }
 
        // user4 should not be able to run app as user4 max limit is 
map[memory:70 vcores:70] and usage so far is map[memory:70 vcores:70]
@@ -540,6 +520,28 @@ func TestUpdateConfigClearEarlierSetLimits(t *testing.T) {
        assert.Equal(t, len(manager.groupWildCardLimitsConfig), 0)
 }
 
+func TestUpdateConfigClearEarlierSetGroupLimits(t *testing.T) {
+       setupUGM()
+       user := security.UserGroup{User: "user1", Groups: []string{"group1"}}
+       conf := createConfigWithGroupOnly(user.Groups[0], 50, 5)
+
+       manager := GetUserManager()
+       assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+
+       usage, err := resources.NewResourceFromConf(map[string]string{"memory": 
"25", "vcores": "25"})
+       if err != nil {
+               t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage)
+       }
+       cQueue := "root.parent.leaf"
+       for i := 1; i <= 2; i++ {
+               increased := manager.IncreaseTrackedResource(cQueue, TestApp1, 
usage, user)
+               assert.Equal(t, increased, true, "unable to increase tracked 
resource: queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+       }
+       assert.NilError(t, manager.UpdateConfig(conf.Queues[0], "root"))
+       increased := manager.IncreaseTrackedResource(cQueue, TestApp1, usage, 
user)
+       assert.Equal(t, increased, false, "unable to increase tracked resource: 
queuepath "+cQueue+", app "+TestApp1+", res "+usage.String())
+}
+
 func TestSetMaxLimitsForRemovedUsers(t *testing.T) {
        setupUGM()
        // Queue setup:
@@ -653,8 +655,12 @@ func TestUserGroupHeadroom(t *testing.T) {
                t.Errorf("new resource create returned error or wrong resource: 
error %t, res %v", err, usage)
        }
        // ensure group headroom returned when there is no limit settings 
configured for user
+       // group1 quota (map[memory:70 vcores:70]) has been removed from 
root.parent.leaf.
+       // so resource usage has been decreased for the same group and that too 
for entire queue hierarchy (root->parent->leaf)
+       // since group1 quota has been configured for root.parent, resource 
usage would be increased from
+       // the place where it has been left. so there is no usage after the 
recent config change, entire group's quota would be returned as headroom.
        headroom = manager.Headroom("root.parent", TestApp1, user)
-       assert.Equal(t, resources.Equals(headroom, resources.Sub(usage1, 
usage)), true)
+       assert.Equal(t, resources.Equals(headroom, usage1), true)
 }
 
 func TestDecreaseTrackedResourceForGroupTracker(t *testing.T) {
@@ -1304,6 +1310,48 @@ func createConfigWithDifferentGroups(user string, group 
string, resourceKey stri
        return conf
 }
 
+func createConfigWithGroupOnly(group string, mem int, maxApps uint64) 
configs.PartitionConfig {
+       conf := configs.PartitionConfig{
+               Name: "test",
+               Queues: []configs.QueueConfig{
+                       {
+                               Name:      "root",
+                               Parent:    true,
+                               SubmitACL: "*",
+                               Queues: []configs.QueueConfig{
+                                       {
+                                               Name:      "parent",
+                                               Parent:    true,
+                                               SubmitACL: "*",
+                                               Queues: []configs.QueueConfig{
+                                                       {
+                                                               Name:      
"leaf",
+                                                               Parent:    
false,
+                                                               SubmitACL: "*",
+                                                               Queues:    nil,
+                                                       },
+                                               },
+                                               Limits: []configs.Limit{
+                                                       {
+                                                               Limit: "parent 
queue limit",
+                                                               Groups: 
[]string{
+                                                                       group,
+                                                               },
+                                                               MaxResources: 
map[string]string{
+                                                                       
"memory": strconv.Itoa(mem),
+                                                                       
"vcores": strconv.Itoa(mem),
+                                                               },
+                                                               
MaxApplications: maxApps,
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+       return conf
+}
+
 func createConfigWithoutLimits() configs.PartitionConfig {
        conf := configs.PartitionConfig{
                Name: "test",
@@ -1330,6 +1378,7 @@ func setupUGM() {
        manager := GetUserManager()
        manager.ClearUserTrackers()
        manager.ClearGroupTrackers()
+       manager.ClearConfigLimits()
 }
 
 func assertUGM(t *testing.T, userGroup security.UserGroup, expected 
*resources.Resource, usersCount int) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go 
b/pkg/scheduler/ugm/queue_tracker.go
index deef4a74..39124c25 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -250,6 +250,7 @@ func (qt *QueueTracker) headroom(hierarchy []string) 
*resources.Resource {
                }
                childHeadroom = 
qt.childQueueTrackers[childName].headroom(hierarchy[1:])
        }
+
        // arrived at the leaf or on the way out: check against current max if 
set
        if !resources.Equals(resources.NewResource(), qt.maxResources) {
                headroom = qt.maxResources.Clone()
@@ -349,6 +350,10 @@ func (qt *QueueTracker) UnlinkQT(hierarchy []string) bool {
                if qt.childQueueTrackers[childName] != nil {
                        if 
qt.childQueueTrackers[childName].UnlinkQT(hierarchy[1:]) {
                                delete(qt.childQueueTrackers, childName)
+                               // returning false, so that it comes out when 
end queue detach itself from its immediate parent.
+                               // i.e., once leaf detached from root.parent 
for root.parent.leaf queue path.
+                               // otherwise, detachment continues all the way 
upto the root, even parent from root. which is not needed.
+                               return false
                        }
                }
        } else if len(hierarchy) <= 1 {
@@ -389,12 +394,10 @@ func (qt *QueueTracker) 
decreaseTrackedResourceUsageDownwards(hierarchy []string
                        }
                }
        }
-
        if len(qt.runningApplications) > 0 && qt.resourceUsage != 
resources.NewResource() {
                qt.resourceUsage = resources.NewResource()
                qt.runningApplications = make(map[string]bool)
        }
-
        return removedApplications
 }
 
@@ -456,3 +459,29 @@ func (qt *QueueTracker) canRunApp(hierarchy []string, 
applicationID string, trac
        }
        return true
 }
+
+// canBeRemoved Start from root and reach all levels of queue hierarchy to 
confirm whether corresponding queue tracker
+// object can be removed from ugm or not. Based on running applications, 
resource usage, child queue trackers, max running apps, max resources etc
+// it decides the removal. It returns false the moment it sees any unexpected 
values for any queue in any levels.
+func (qt *QueueTracker) canBeRemoved() bool {
+       for _, childQT := range qt.childQueueTrackers {
+               // quick check to avoid further traversal
+               if childQT.canBeRemovedInternal() {
+                       if !childQT.canBeRemoved() {
+                               return false
+                       }
+               } else {
+                       return false
+               }
+       }
+       // reached leaf queues, no more to traverse
+       return qt.canBeRemovedInternal()
+}
+
+func (qt *QueueTracker) canBeRemovedInternal() bool {
+       if len(qt.runningApplications) == 0 && 
resources.Equals(resources.NewResource(), qt.resourceUsage) && 
len(qt.childQueueTrackers) == 0 &&
+               qt.maxRunningApps == 0 && 
resources.Equals(resources.NewResource(), qt.maxResources) {
+               return true
+       }
+       return false
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go 
b/pkg/scheduler/ugm/user_tracker.go
index 5639253d..5e842d58 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -164,11 +164,10 @@ func (ut *UserTracker) UnlinkQT(queuePath string) bool {
        return ut.queueTracker.UnlinkQT(strings.Split(queuePath, configs.DOT))
 }
 
-// canBeRemoved Does "root" queue has any child queue trackers? Is there any 
running applications in "root" qt?
 func (ut *UserTracker) canBeRemoved() bool {
        ut.RLock()
        defer ut.RUnlock()
-       return len(ut.queueTracker.childQueueTrackers) == 0 && 
len(ut.queueTracker.runningApplications) == 0
+       return ut.queueTracker.canBeRemoved()
 }
 
 func (ut *UserTracker) canRunApp(queuePath, applicationID string) bool {


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to