This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new cf73072f [YUNIKORN-2500] Use GetPreemptableResource, 
GetRemainingGuaranteedResource in Preemption flow (#830)
cf73072f is described below

commit cf73072f7037500e476e9a88d38a5a4e6779172d
Author: Manikandan R <maniraj...@gmail.com>
AuthorDate: Tue Jun 25 16:16:28 2024 +0530

    [YUNIKORN-2500] Use GetPreemptableResource, GetRemainingGuaranteedResource 
in Preemption flow (#830)
    
    Closes: #830
    
    Signed-off-by: Manikandan R <maniraj...@gmail.com>
---
 pkg/common/resources/resources.go              |  91 ++++++++
 pkg/common/resources/resources_test.go         | 145 ++++++++++++
 pkg/mock/preemption_predicate_plugin.go        |  10 +
 pkg/scheduler/objects/preemption.go            | 203 +++++++++-------
 pkg/scheduler/objects/preemption_queue_test.go |  48 +++-
 pkg/scheduler/objects/preemption_test.go       | 310 ++++++++++++++-----------
 pkg/scheduler/objects/queue.go                 |  15 +-
 pkg/scheduler/objects/queue_test.go            |   2 +-
 pkg/scheduler/objects/utilities_test.go        |   1 +
 9 files changed, 578 insertions(+), 247 deletions(-)

diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index 01967a43..e1236c7c 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -645,6 +645,25 @@ func Equals(left, right *Resource) bool {
        return true
 }
 
+// MatchAny returns true if at least one type in the defined resource exists 
in the other resource.
+// False if none of the types exist in the other resource.
+// A nil resource is treated as an empty resource (no types defined) and 
returns false
+// Values are not considered during the checks
+func (r *Resource) MatchAny(other *Resource) bool {
+       if r == nil || other == nil {
+               return false
+       }
+       if r == other {
+               return true
+       }
+       for k := range r.Resources {
+               if _, ok := other.Resources[k]; ok {
+                       return true
+               }
+       }
+       return false
+}
+
 // Compare the resources equal returns the specific values for following cases:
 // left  right  return
 // nil   nil    true
@@ -751,6 +770,55 @@ func StrictlyGreaterThanOrEquals(larger, smaller 
*Resource) bool {
        return true
 }
 
+// StrictlyGreaterThanOnlyExisting returns true if all quantities for types in 
the defined resource are greater than
+// the quantity for the same type in smaller.
+// Types defined in smaller that are not in the defined resource are ignored.
+// Two resources that are equal are not considered strictly larger than each 
other.
+func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller *Resource) bool {
+       if r == nil {
+               r = Zero
+       }
+       if smaller == nil {
+               smaller = Zero
+       }
+
+       // keep track of the number of not equal values
+       notEqual := false
+
+       // Is larger and smaller completely disjoint?
+       atleastOneResourcePresent := false
+       // Is all resource in larger greater than zero?
+       isAllPositiveInLarger := true
+
+       for k, v := range r.Resources {
+               // even when smaller is empty, all resource type in larger 
should be greater than zero
+               if smaller.IsEmpty() && v <= 0 {
+                       isAllPositiveInLarger = false
+               }
+               // when smaller is not empty
+               if val, ok := smaller.Resources[k]; ok {
+                       // at least one common resource type is there
+                       atleastOneResourcePresent = true
+                       if val > v {
+                               return false
+                       }
+                       if val != v {
+                               notEqual = true
+                       }
+               }
+       }
+
+       switch {
+       case smaller.IsEmpty() && !r.IsEmpty():
+               return isAllPositiveInLarger
+       case atleastOneResourcePresent:
+               return notEqual
+       default:
+               // larger and smaller is completely disjoint. none of the 
resource match.
+               return !r.IsEmpty() && !smaller.IsEmpty()
+       }
+}
+
 // Have at least one quantity > 0, and no quantities < 0
 // A nil resource is not strictly greater than zero.
 func StrictlyGreaterThanZero(larger *Resource) bool {
@@ -816,6 +884,29 @@ func ComponentWiseMinPermissive(left, right *Resource) 
*Resource {
        return out
 }
 
+// ComponentWiseMinOnlyExisting Returns a new Resource with the smallest value 
for resource type
+// existing only in left but not vice versa.
+func ComponentWiseMinOnlyExisting(left, right *Resource) *Resource {
+       out := NewResource()
+       if right == nil && left == nil {
+               return nil
+       }
+       if left == nil {
+               return nil
+       }
+       if right == nil {
+               return left.Clone()
+       }
+       for k, v := range left.Resources {
+               if val, ok := right.Resources[k]; ok {
+                       out.Resources[k] = min(v, val)
+               } else {
+                       out.Resources[k] = v
+               }
+       }
+       return out
+}
+
 func (r *Resource) HasNegativeValue() bool {
        if r == nil {
                return false
diff --git a/pkg/common/resources/resources_test.go 
b/pkg/common/resources/resources_test.go
index d134101f..c1092cf4 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -319,6 +319,111 @@ func TestStrictlyGreaterThan(t *testing.T) {
        }
 }
 
+func TestMatchAnyOnlyExisting(t *testing.T) {
+       var tests = []struct {
+               caseName string
+               left     map[string]Quantity
+               right    map[string]Quantity
+               expected bool
+       }{
+               {"nil resource should not match", nil, nil, false},
+               {"empty resource should not match", map[string]Quantity{}, 
map[string]Quantity{}, false},
+               {"equal positive resources should match", 
map[string]Quantity{"first": 1}, map[string]Quantity{"first": 1}, true},
+               {"equal positive resources with different values should match", 
map[string]Quantity{"first": 1}, map[string]Quantity{"first": 2}, true},
+               {"positive resource and nil resource should not match", 
map[string]Quantity{"first": 1}, nil, false},
+               {"nil resource and positive resource should not match", nil, 
map[string]Quantity{"first": 1}, false},
+               {"positive resource and empty resource should not match", 
map[string]Quantity{"first": 1}, map[string]Quantity{}, false},
+               {"empty resource and positive resource should not match", 
map[string]Quantity{}, map[string]Quantity{"first": 1}, false},
+               {"positive resource should match even though value is zero", 
map[string]Quantity{"zero": 0}, map[string]Quantity{"zero": 0}, true},
+               {"positive resource should match even though extra resource 
type is there", map[string]Quantity{"first": 10}, map[string]Quantity{"first": 
10, "second": 1}, true},
+               {"positive resource should match even though extra resource 
type is there", map[string]Quantity{"first": 10, "second": 1}, 
map[string]Quantity{"first": 10}, true},
+               {"resource should not match", map[string]Quantity{"first": 10}, 
map[string]Quantity{"second": 10}, false},
+               {"resource should not match", map[string]Quantity{"second": 
10}, map[string]Quantity{"first": 10}, false},
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       var left *Resource
+                       var right *Resource
+                       if tt.left != nil {
+                               left = NewResourceFromMap(tt.left)
+                       }
+                       if tt.right != nil {
+                               right = NewResourceFromMap(tt.right)
+                       }
+                       if result := left.MatchAny(right); result != 
tt.expected {
+                               t.Errorf("MatchAny: got %v, expected %v", 
result, tt.expected)
+                       }
+               })
+       }
+}
+
+func TestStrictlyGreaterThanOnlyExisting(t *testing.T) {
+       type inputs struct {
+               larger  map[string]Quantity
+               smaller map[string]Quantity
+               sameRef bool
+       }
+       type outputs struct {
+               larger  bool
+               smaller bool
+       }
+       var tests = []struct {
+               caseName string
+               input    inputs
+               expected outputs
+       }{
+               {"Nil check", inputs{nil, nil, false}, outputs{false, false}},
+               {"Positive resource and empty resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{}, false}, 
outputs{true, false}},
+               {"Positive resource and nil resources", 
inputs{map[string]Quantity{"first": 10}, nil, false}, outputs{true, false}},
+
+               {"Equal Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10}, 
false}, outputs{false, false}},
+               {"Different Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20}, 
false}, outputs{false, true}},
+               {"Different Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5}, 
false}, outputs{true, false}},
+
+               {"Equal Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10, 
"sec": 10}, false}, outputs{false, false}},
+               {"Different Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20, 
"sec": 10}, false}, outputs{false, true}},
+               {"Different Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5, "sec": 
10}, false}, outputs{true, false}},
+
+               {"Equal Positive resources but completely disjoint", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"sec": 10}, 
false}, outputs{true, true}},
+               {"Zero resource and empty resources", 
inputs{map[string]Quantity{"first": 0}, map[string]Quantity{}, false}, 
outputs{false, false}},
+               {"Zero resource and nil resources", 
inputs{map[string]Quantity{"first": 0}, nil, false}, outputs{false, false}},
+
+               {"Negative resource and empty resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{}, false}, 
outputs{false, false}},
+               {"Negative resource and nil resources", 
inputs{map[string]Quantity{"first": -10}, nil, false}, outputs{false, false}},
+               {"Negative resource and empty resources", 
inputs{map[string]Quantity{"first": -10, "sec": 10}, map[string]Quantity{}, 
false}, outputs{false, false}},
+               {"Negative resource and nil resources", 
inputs{map[string]Quantity{"first": -10, "sec": 10}, nil, false}, 
outputs{false, false}},
+
+               {"Equal Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10}, 
false}, outputs{false, false}},
+               {"Different Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20}, 
false}, outputs{true, false}},
+               {"Different Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5}, 
false}, outputs{false, true}},
+
+               {"Equal Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10, 
"sec": 10}, false}, outputs{false, false}},
+               {"Different Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20, 
"sec": 10}, false}, outputs{true, false}},
+               {"Different Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5, 
"sec": 10}, false}, outputs{false, true}},
+
+               {"Equal Negative resources but completely disjoint", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"sec": -10}, 
false}, outputs{true, true}},
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       var compare, base *Resource
+                       if tt.input.larger != nil {
+                               compare = NewResourceFromMap(tt.input.larger)
+                       }
+                       if tt.input.sameRef {
+                               base = compare
+                       } else {
+                               base = NewResourceFromMap(tt.input.smaller)
+                       }
+                       if result := 
compare.StrictlyGreaterThanOnlyExisting(base); result != tt.expected.larger {
+                               t.Errorf("comapre %v, base %v, got %v, 
expeceted %v", compare, base, result, tt.expected.larger)
+                       }
+                       if result := 
base.StrictlyGreaterThanOnlyExisting(compare); result != tt.expected.smaller {
+                               t.Errorf("base %v, compare %v, got %v, 
expeceted %v", base, compare, result, tt.expected.smaller)
+                       }
+               })
+       }
+}
+
 func TestStrictlyGreaterThanOrEquals(t *testing.T) {
        type inputs struct {
                larger  map[string]Quantity
@@ -443,6 +548,46 @@ func TestComponentWiseMinPermissive(t *testing.T) {
        }
 }
 
+func TestComponentWiseMinOnlyExisting(t *testing.T) {
+       testCases := []struct {
+               name     string
+               left     map[string]Quantity
+               right    map[string]Quantity
+               expected map[string]Quantity
+       }{
+               {"Min of nil resources should be nil", nil, nil, nil},
+               {"Min of empty resources should be empty resource ", 
map[string]Quantity{}, map[string]Quantity{}, map[string]Quantity{}},
+               {"Min of positive resource and nil resource", 
map[string]Quantity{"first": 5}, nil, map[string]Quantity{"first": 5}},
+               {"Min of nil resource and positive resource", nil, 
map[string]Quantity{"first": 5}, nil},
+               {"Min of two positive resources", map[string]Quantity{"first": 
5}, map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5}},
+               {"Min of two positive resources", map[string]Quantity{"first": 
10}, map[string]Quantity{"first": 5}, map[string]Quantity{"first": 5}},
+               {"Min of positive resource and negative resource", 
map[string]Quantity{"first": 5}, map[string]Quantity{"first": -5}, 
map[string]Quantity{"first": -5}},
+               {"Min of positive resource and negative resource", 
map[string]Quantity{"first": -5}, map[string]Quantity{"first": 5}, 
map[string]Quantity{"first": -5}},
+               {"Min of two positive resources with extra resource types", 
map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5, "second": 
15}, map[string]Quantity{"first": 5}},
+               {"Min of two positive resources with extra resource types", 
map[string]Quantity{"first": 5, "second": 15}, map[string]Quantity{"first": 
10}, map[string]Quantity{"first": 5, "second": 15}},
+               {"Min of positive resource and negative resource with extra 
resource types", map[string]Quantity{"first": 10}, map[string]Quantity{"first": 
-5, "second": 15}, map[string]Quantity{"first": -5}},
+               {"Min of positive resource and negative resource with extra 
resource types", map[string]Quantity{"first": -5, "second": 15}, 
map[string]Quantity{"first": 10}, map[string]Quantity{"first": -5, "second": 
15}},
+       }
+       for _, tc := range testCases {
+               var left *Resource
+               var right *Resource
+               var expected *Resource
+               if tc.left != nil {
+                       left = NewResourceFromMap(tc.left)
+               }
+               if tc.right != nil {
+                       right = NewResourceFromMap(tc.right)
+               }
+               if tc.expected != nil {
+                       expected = NewResourceFromMap(tc.expected)
+               }
+               t.Run(tc.name, func(t *testing.T) {
+                       result := ComponentWiseMinOnlyExisting(left, right)
+                       assert.DeepEqual(t, result, expected)
+               })
+       }
+}
+
 func TestComponentWiseMax(t *testing.T) {
        type inputs struct {
                res1    map[string]Quantity
diff --git a/pkg/mock/preemption_predicate_plugin.go 
b/pkg/mock/preemption_predicate_plugin.go
index af47c3ac..ea4b99e8 100644
--- a/pkg/mock/preemption_predicate_plugin.go
+++ b/pkg/mock/preemption_predicate_plugin.go
@@ -22,6 +22,8 @@ import (
        "errors"
        "fmt"
 
+       "github.com/apache/yunikorn-core/pkg/locking"
+
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -31,6 +33,8 @@ type PreemptionPredicatePlugin struct {
        allocations  map[string]string
        preemptions  []Preemption
        errHolder    *errHolder
+
+       locking.RWMutex
 }
 
 type Preemption struct {
@@ -47,6 +51,8 @@ type errHolder struct {
 }
 
 func (m *PreemptionPredicatePlugin) Predicates(args *si.PredicatesArgs) error {
+       m.RLock()
+       defer m.RUnlock()
        if args.Allocate {
                nodeID, ok := m.allocations[args.AllocationKey]
                if !ok {
@@ -69,6 +75,8 @@ func (m *PreemptionPredicatePlugin) Predicates(args 
*si.PredicatesArgs) error {
 }
 
 func (m *PreemptionPredicatePlugin) PreemptionPredicates(args 
*si.PreemptionPredicatesArgs) *si.PreemptionPredicatesResponse {
+       m.Lock()
+       defer m.Unlock()
        result := &si.PreemptionPredicatesResponse{
                Success: false,
                Index:   -1,
@@ -109,6 +117,8 @@ func (m *PreemptionPredicatePlugin) 
PreemptionPredicates(args *si.PreemptionPred
 // GetPredicateError returns the error set by the preemption predicate check 
that failed.
 // Returns a nil error on success.
 func (m *PreemptionPredicatePlugin) GetPredicateError() error {
+       m.RLock()
+       defer m.RUnlock()
        return m.errHolder.err
 }
 
diff --git a/pkg/scheduler/objects/preemption.go 
b/pkg/scheduler/objects/preemption.go
index 110cd650..f006a0ed 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -195,7 +195,8 @@ func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
        for _, snapshot := range queues {
                for _, alloc := range snapshot.PotentialVictims {
                        snapshot.RemoveAllocation(alloc.GetAllocatedResource())
-                       if currentQueue.IsWithinGuaranteedResource() {
+                       remaining := 
currentQueue.GetRemainingGuaranteedResource()
+                       if remaining != nil && 
resources.StrictlyGreaterThanOrEquals(remaining, resources.Zero) {
                                return true
                        }
                }
@@ -225,9 +226,6 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable 
*resources.Resource, po
                return -1, nil
        }
 
-       // speculatively add the current ask
-       askQueue.AddAllocation(p.ask.GetAllocatedResource())
-
        // First pass: Check each task to see whether we are able to reduce our 
shortfall by preempting each
        // task in turn, and filter out tasks which will cause their queue to 
drop below guaranteed capacity.
        // If a task could be preempted without violating queue constraints, 
add it to either the 'head' list or the
@@ -239,15 +237,37 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable 
*resources.Resource, po
                // check to see if removing this task will keep queue above 
guaranteed amount; if not, skip to the next one
                if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
                        if queueSnapshot, ok2 := 
allocationsByQueueSnap[qv.QueuePath]; ok2 {
+                               oldRemaining := 
queueSnapshot.GetRemainingGuaranteedResource()
                                
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
-                               // did removing this allocation still keep the 
queue over-allocated?
-                               if 
queueSnapshot.IsAtOrAboveGuaranteedResource() {
+                               preemptableResource := 
queueSnapshot.GetPreemptableResource()
+
+                               // Did removing this allocation still keep the 
queue over-allocated?
+                               // At times, over-allocation happens because of 
resource types in usage but not defined as guaranteed.
+                               // So, as an additional check, -ve remaining 
guaranteed resource before removing the victim means
+                               // some really useful victim is there.
+                               // In case of victims densely populated on any 
specific node, checking/honouring the guaranteed quota on ask or preemptor queue
+                               // acts as early filtering layer to carry 
forward only the required victims.
+                               // For other cases like victims spread over 
multiple nodes, this doesn't add great value.
+                               if 
resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
+                                       (oldRemaining == nil || 
resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
+                                       // add the current victim into the ask 
queue
+                                       
askQueue.AddAllocation(victim.GetAllocatedResource())
+                                       askQueueNewRemaining := 
askQueue.GetRemainingGuaranteedResource()
+
+                                       // Did adding this allocation make the 
ask queue over - utilized?
+                                       if askQueueNewRemaining != nil && 
resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
+                                               
askQueue.RemoveAllocation(victim.GetAllocatedResource())
+                                               
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
+                                               break
+                                       }
+
                                        // check to see if the shortfall on the 
node has changed
                                        shortfall := 
resources.SubEliminateNegative(p.ask.GetAllocatedResource(), 
nodeCurrentAvailable)
                                        newAvailable := 
resources.Add(nodeCurrentAvailable, victim.GetAllocatedResource())
                                        newShortfall := 
resources.SubEliminateNegative(p.ask.GetAllocatedResource(), newAvailable)
                                        if resources.EqualsOrEmpty(shortfall, 
newShortfall) {
                                                // shortfall did not change, so 
task should only be considered as a last resort
+                                               
askQueue.RemoveAllocation(victim.GetAllocatedResource())
                                                
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
                                                tail = append(tail, victim)
                                        } else {
@@ -273,15 +293,12 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable 
*resources.Resource, po
        allocationsByQueueSnap = p.duplicateQueueSnapshots()
 
        // get the current queue snapshot
-       askQueue, ok2 := allocationsByQueueSnap[p.queuePath]
+       _, ok2 := allocationsByQueueSnap[p.queuePath]
        if !ok2 {
                log.Log(log.SchedPreemption).Warn("BUG: Queue not found by 
name", zap.String("queuePath", p.queuePath))
                return -1, nil
        }
 
-       // speculatively add the current ask
-       askQueue.AddAllocation(p.ask.GetAllocatedResource())
-
        // Second pass: The task ordering can no longer change. For each task, 
check that queue constraints would not be
        // violated if the task were to be preempted. If so, discard the task. 
If the task can be preempted, adjust
        // both the node available capacity and the queue headroom. Save the 
Index within the results of the first task
@@ -292,8 +309,17 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable 
*resources.Resource, po
                // check to see if removing this task will keep queue above 
guaranteed amount; if not, skip to the next one
                if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
                        if queueSnapshot, ok2 := 
allocationsByQueueSnap[qv.QueuePath]; ok2 {
+                               oldRemaining := 
queueSnapshot.GetRemainingGuaranteedResource()
                                
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
-                               if 
queueSnapshot.IsAtOrAboveGuaranteedResource() {
+                               preemptableResource := 
queueSnapshot.GetPreemptableResource()
+
+                               // Did removing this allocation still keep the 
queue over-allocated?
+                               // At times, over-allocation happens because of 
resource types in usage but not defined as guaranteed.
+                               // So, as an additional check, -ve remaining 
guaranteed resource before removing the victim means
+                               // some really useful victim is there.
+                               // Similar checks could be added even on the 
ask or preemptor queue to prevent being over utilized.
+                               if 
resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
+                                       (oldRemaining == nil || 
resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
                                        // removing task does not violate queue 
constraints, adjust queue and node
                                        
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
                                        // check if ask now fits and we haven't 
had this happen before
@@ -410,9 +436,6 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims 
[]*Allocation) ([]*Al
                return nil, false
        }
 
-       // speculatively add the current ask
-       askQueue.AddAllocation(p.ask.GetAllocatedResource())
-
        // remove all victims previously chosen for the node
        seen := make(map[string]*Allocation, 0)
        for _, victim := range nodeVictims {
@@ -442,25 +465,38 @@ func (p *Preemptor) 
calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
        // evaluate each potential victim in turn, stopping once sufficient 
resources have been freed
        victims := make([]*Allocation, 0)
        for _, victim := range potentialVictims {
-               // stop search if the ask fits into the queue
-               if askQueue.IsWithinGuaranteedResource() {
-                       break
-               }
                // check to see if removing this task will keep queue above 
guaranteed amount; if not, skip to the next one
                if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
                        if queueSnapshot, ok2 := 
allocationsByQueueSnap[qv.QueuePath]; ok2 {
-                               remaining := askQueue.GetRemainingGuaranteed()
+                               oldRemaining := 
queueSnapshot.GetRemainingGuaranteedResource()
                                
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
-                               // did removing this allocation still keep the 
queue over-allocated?
-                               if 
queueSnapshot.IsAtOrAboveGuaranteedResource() {
-                                       // check to see if the shortfall on the 
queue has changed
-                                       newRemaining := 
askQueue.GetRemainingGuaranteed()
-                                       if resources.EqualsOrEmpty(remaining, 
newRemaining) {
-                                               // remaining guaranteed amount 
in ask queue did not change, so preempting task won't help
+
+                               // Did removing this allocation still keep the 
queue over-allocated?
+                               // At times, over-allocation happens because of 
resource types in usage but not defined as guaranteed.
+                               // So, as an additional check, -ve remaining 
guaranteed resource before removing the victim means
+                               // some really useful victim is there.
+                               preemptableResource := 
queueSnapshot.GetPreemptableResource()
+                               if 
resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
+                                       (oldRemaining == nil || 
resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
+                                       askQueueRemainingAfterVictimRemoval := 
askQueue.GetRemainingGuaranteedResource()
+
+                                       // add the current victim into the ask 
queue
+                                       
askQueue.AddAllocation(victim.GetAllocatedResource())
+                                       askQueueNewRemaining := 
askQueue.GetRemainingGuaranteedResource()
+                                       // Did adding this allocation make the 
ask queue over - utilized?
+                                       if askQueueNewRemaining != nil && 
resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
+                                               
askQueue.RemoveAllocation(victim.GetAllocatedResource())
                                                
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
-                                       } else {
+                                               break
+                                       }
+                                       // check to see if the shortfall on the 
queue has changed
+                                       if 
!resources.EqualsOrEmpty(askQueueRemainingAfterVictimRemoval, 
askQueueNewRemaining) {
                                                // remaining capacity changed, 
so we should keep this task
                                                victims = append(victims, 
victim)
+                                       } else {
+                                               // remaining guaranteed amount 
in ask queue did not change, so preempting task won't help
+                                               
askQueue.RemoveAllocation(victim.GetAllocatedResource())
+                                               
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
                                        }
                                } else {
                                        // removing this allocation would have 
reduced queue below guaranteed limits, put it back
@@ -469,8 +505,9 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims 
[]*Allocation) ([]*Al
                        }
                }
        }
-
-       if askQueue.IsWithinGuaranteedResource() {
+       // At last, did the ask queue usage under or equals guaranteed quota?
+       finalRemainingRes := askQueue.GetRemainingGuaranteedResource()
+       if finalRemainingRes != nil && 
resources.StrictlyGreaterThanOrEquals(finalRemainingRes, resources.Zero) {
                return victims, true
        }
        return nil, false
@@ -511,7 +548,6 @@ func (p *Preemptor) tryNodes() (string, []*Allocation, 
bool) {
        if result != nil && result.success {
                return result.nodeID, result.victims, true
        }
-
        return "", nil, false
 }
 
@@ -542,8 +578,44 @@ func (p *Preemptor) TryPreemption() (*Allocation, bool) {
                return nil, false
        }
 
-       // preempt the victims
+       // Did victims collected so far fulfill the ask need? In case of any 
shortfall between the ask resource requirement
+       // and total victims resources, preemption won't help even though 
victims has been collected.
+
+       // Holds total victims resources
+       victimsTotalResource := resources.NewResource()
+
+       fitIn := false
+       nodeCurrentAvailable := p.nodeAvailableMap
+       if nodeCurrentAvailable[nodeID].FitIn(p.ask.GetAllocatedResource()) {
+               fitIn = true
+       }
+
+       // Since there could be more victims than the actual need, ensure only 
required victims are filtered finally
+       // to do: There is room for improvements especially when there are more 
victims. victims could be chosen based
+       // on different criteria. for example, victims could be picked up 
either from specific node (bin packing) or
+       // from multiple nodes (fair) given the choices.
+       var finalVictims []*Allocation
        for _, victim := range victims {
+               // Victims from any node is acceptable as long as chosen node 
has enough space to accommodate the ask
+               // Otherwise, preempting victims from 'n' different nodes 
doesn't help to achieve the goal.
+               if !fitIn && victim.nodeID != nodeID {
+                       continue
+               }
+               // stop collecting the victims once ask resource requirement met
+               if 
p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource)
 {
+                       finalVictims = append(finalVictims, victim)
+               }
+               // add the victim resources to the total
+               victimsTotalResource.AddTo(victim.GetAllocatedResource())
+       }
+
+       if 
p.ask.GetAllocatedResource().StrictlyGreaterThanOnlyExisting(victimsTotalResource)
 {
+               // there is shortfall, so preemption doesn't help
+               return nil, false
+       }
+
+       // preempt the victims
+       for _, victim := range finalVictims {
                if victimQueue := 
p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
                        
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
                        victim.MarkPreempted()
@@ -668,7 +740,6 @@ func (qps *QueuePreemptionSnapshot) Duplicate(copy 
map[string]*QueuePreemptionSn
        if qps == nil {
                return nil
        }
-
        if existing, ok := copy[qps.QueuePath]; ok {
                return existing
        }
@@ -692,63 +763,6 @@ func (qps *QueuePreemptionSnapshot) Duplicate(copy 
map[string]*QueuePreemptionSn
        return snapshot
 }
 
-// IsAtOrAboveGuaranteedResource determines if this queue is exceeding 
resource guarantees and therefore
-// may be eligible for further preemption
-func (qps *QueuePreemptionSnapshot) IsAtOrAboveGuaranteedResource() bool {
-       if qps == nil {
-               return false
-       }
-       guaranteed := qps.GetGuaranteedResource()
-       maxResource := qps.GetMaxResource()
-       absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, 
maxResource)
-       used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
-
-       // if we don't fit, we're clearly above
-       if !absGuaranteed.FitIn(used) {
-               return true
-       }
-
-       usedOrMax := resources.ComponentWiseMax(guaranteed, used)
-       return resources.Equals(usedOrMax, used)
-}
-
-// IsWithinGuaranteedResource determines if this queue is within its current 
resource guarantees
-func (qps *QueuePreemptionSnapshot) IsWithinGuaranteedResource() bool {
-       if qps == nil {
-               return true
-       }
-       // check the parent, as violations at any level mean we are not within 
limits
-       if !qps.Parent.IsWithinGuaranteedResource() {
-               return false
-       }
-       guaranteed := qps.GetGuaranteedResource()
-
-       // if this is a leaf queue and we have not found any guaranteed 
resources, then we are never within guaranteed usage
-       if qps.Leaf && guaranteed.IsEmpty() {
-               return false
-       }
-       maxResource := qps.GetMaxResource()
-       absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, 
maxResource)
-       used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
-       return absGuaranteed.FitIn(used)
-}
-
-func (qps *QueuePreemptionSnapshot) GetRemainingGuaranteed() 
*resources.Resource {
-       if qps == nil {
-               return nil
-       }
-       parentResult := qps.Parent.GetRemainingGuaranteed()
-       if parentResult == nil {
-               parentResult = resources.NewResource()
-       }
-       guaranteed := qps.GetGuaranteedResource()
-       maxResource := qps.GetMaxResource()
-       absGuaranteed := resources.ComponentWiseMinPermissive(guaranteed, 
maxResource)
-       used := resources.Sub(qps.AllocatedResource, qps.PreemptingResource)
-       remaining := resources.Sub(absGuaranteed, used)
-       return resources.ComponentWiseMin(remaining, parentResult)
-}
-
 func (qps *QueuePreemptionSnapshot) GetPreemptableResource() 
*resources.Resource {
        if qps == nil {
                return nil
@@ -783,7 +797,13 @@ func (qps *QueuePreemptionSnapshot) 
GetPreemptableResource() *resources.Resource
        if preemptableResource.IsEmpty() {
                return preemptableResource
        }
-       return resources.ComponentWiseMinPermissive(preemptableResource, 
parentPreemptableResource)
+
+       // Calculate min of current (leaf) and parent queue preemptable 
resource using current (leaf) queue as base because overall intention
+       // is to preempt something from the current queue (leaf).
+       // There is no use for the resource types not present in current (leaf) 
queue but available in parent queue
+       // (might be because of other current queue siblings) and also leads to 
wrong perception.
+       // So minimum would be derived only for resource types in current 
(leaf) queue preemptable resource.
+       return resources.ComponentWiseMinOnlyExisting(preemptableResource, 
parentPreemptableResource)
 }
 
 func (qps *QueuePreemptionSnapshot) GetRemainingGuaranteedResource() 
*resources.Resource {
@@ -915,6 +935,11 @@ func preemptPredicateCheck(plugin 
api.ResourceManagerCallback, ch chan<- *predic
                }); err == nil {
                        result.success = true
                        result.index = -1
+               } else {
+                       log.Log(log.SchedPreemption).Debug("Normal predicate 
check failed",
+                               zap.String("AllocationKey", args.AllocationKey),
+                               zap.String("NodeID", args.NodeID),
+                               zap.Error(err))
                }
        } else if response := plugin.PreemptionPredicates(args); response != 
nil {
                // preemption check; at least one allocation will need 
preemption
diff --git a/pkg/scheduler/objects/preemption_queue_test.go 
b/pkg/scheduler/objects/preemption_queue_test.go
index 6ce96f5e..0baaffaf 100644
--- a/pkg/scheduler/objects/preemption_queue_test.go
+++ b/pkg/scheduler/objects/preemption_queue_test.go
@@ -45,9 +45,26 @@ func TestGetPreemptableResource(t *testing.T) {
        // guaranteed set but no usage. so nothing to preempt
        // clean start for the snapshot: whole hierarchy with guarantee
        smallestRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+
+       // clean start for the snapshot: all set guaranteed - new tests
+       // add usage to parent + root: use all guaranteed at parent level
+       // add usage to child2: use double than guaranteed
+       parentQ.guaranteedResource = smallestRes
+       rootQ.allocatedResource = resources.Multiply(smallestRes, 2)
+       parentQ.allocatedResource = resources.Multiply(smallestRes, 2)
+       childQ2.allocatedResource = resources.Multiply(smallestRes, 2)
+       rootPreemptable, pPreemptable, cPreemptable1, cPreemptable2 = 
getPreemptableResource(rootQ, parentQ, childQ1, childQ2)
+       assert.Assert(t, resources.Equals(rootPreemptable, 
resources.Multiply(smallestRes, 2)), "usage is equal to guaranteed in root 
queue. so nothing to preempt")
+       assert.Assert(t, resources.Equals(pPreemptable, smallestRes), "usage 
has exceeded twice than guaranteed in parent queue. preemtable resource should 
be equal to guaranteed res")
+       assert.Assert(t, resources.IsZero(cPreemptable1), "nothing to preempt 
as no usage in child1 queue")
+       assert.Assert(t, resources.Equals(cPreemptable2, smallestRes), "usage 
has exceeded twice than guaranteed in child2 queue. preemtable resource should 
be equal to guaranteed res")
+
        rootQ.guaranteedResource = resources.Multiply(smallestRes, 2)
        parentQ.guaranteedResource = smallestRes
        childQ2.guaranteedResource = smallestRes
+       rootQ.allocatedResource = nil
+       parentQ.allocatedResource = nil
+       childQ2.allocatedResource = nil
        childRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
        childQ1.guaranteedResource = childRes
        rootPreemptable, pPreemptable, cPreemptable1, cPreemptable2 = 
getPreemptableResource(rootQ, parentQ, childQ1, childQ2)
@@ -105,25 +122,36 @@ func TestGetPreemptableResource(t *testing.T) {
 }
 
 func TestGetRemainingGuaranteedResource(t *testing.T) {
-       // no guaranteed and no usage. so nothing to preempt
-       rootQ, err := createRootQueue(map[string]string{"first": "20"})
+       // no guaranteed and no usage. so no remaining
+       rootQ, err := createRootQueue(map[string]string{})
        assert.NilError(t, err)
        var parentQ, childQ1, childQ2 *Queue
-       parentQ, err = createManagedQueue(rootQ, "parent", true, 
map[string]string{"first": "10"})
+       parentQ, err = createManagedQueue(rootQ, "parent", true, 
map[string]string{})
        assert.NilError(t, err)
-       childQ1, err = createManagedQueue(parentQ, "child1", false, 
map[string]string{"first": "5"})
+       childQ1, err = createManagedQueue(parentQ, "child1", false, 
map[string]string{})
        assert.NilError(t, err)
-       childQ2, err = createManagedQueue(parentQ, "child2", false, 
map[string]string{"first": "5"})
+       childQ2, err = createManagedQueue(parentQ, "child2", false, 
map[string]string{})
        assert.NilError(t, err)
        rootRemaining, pRemaining, cRemaining1, cRemaining2 := 
getRemainingGuaranteed(rootQ, parentQ, childQ1, childQ2)
-       assert.Assert(t, resources.IsZero(rootRemaining), "guaranteed not set, 
so no remaining")
-       assert.Assert(t, resources.IsZero(pRemaining), "guaranteed not set, so 
no remaining")
-       assert.Assert(t, resources.IsZero(cRemaining1), "guaranteed not set, so 
no remaining")
-       assert.Assert(t, resources.IsZero(cRemaining2), "guaranteed not set, so 
no remaining")
+       assert.Assert(t, resources.IsZero(rootRemaining), "guaranteed and max 
res not set, so no remaining")
+       assert.Assert(t, resources.IsZero(pRemaining), "guaranteed and max res 
not set, so no remaining")
+       assert.Assert(t, resources.IsZero(cRemaining1), "guaranteed and max res 
not set, so no remaining")
+       assert.Assert(t, resources.IsZero(cRemaining2), "guaranteed and max res 
not set, so no remaining")
+
+       // no guaranteed and no usage, but max res set. so min of guaranteed 
and max should be remaining
+       smallestRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
+       rootQ.maxResource = resources.Multiply(smallestRes, 4)
+       parentQ.maxResource = resources.Multiply(smallestRes, 2)
+       childQ1.maxResource = smallestRes
+       childQ2.maxResource = smallestRes
+       rootRemaining, pRemaining, cRemaining1, cRemaining2 = 
getRemainingGuaranteed(rootQ, parentQ, childQ1, childQ2)
+       assert.Assert(t, resources.IsZero(rootRemaining), "guaranteed and max 
res not set, so no remaining")
+       assert.Assert(t, resources.IsZero(pRemaining), "guaranteed and max res 
not set, so no remaining")
+       assert.Assert(t, resources.IsZero(cRemaining1), "guaranteed and max res 
not set, so no remaining")
+       assert.Assert(t, resources.IsZero(cRemaining2), "guaranteed and max res 
not set, so no remaining")
 
        // guaranteed set only for queue at specific levels but no usage.
        // so remaining for queues without guaranteed quota inherits from 
parent queue based on min perm calculation
-       smallestRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
        childRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"second": 5})
        rootQ.guaranteedResource = resources.Multiply(smallestRes, 2)
        childQ1.guaranteedResource = childRes
diff --git a/pkg/scheduler/objects/preemption_test.go 
b/pkg/scheduler/objects/preemption_test.go
index d57d3a63..bba1a9c2 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -30,9 +30,7 @@ import (
        "github.com/apache/yunikorn-core/pkg/plugins"
 )
 
-const appID3 = "app-3"
 const alloc = "alloc"
-const node1 = "node1"
 
 func TestCheckPreconditions(t *testing.T) {
        node := newNode("node1", map[string]resources.Quantity{"first": 5})
@@ -146,13 +144,13 @@ func 
TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
                t.Run(tt.testName, func(t *testing.T) {
                        node := newNode("node1", 
map[string]resources.Quantity{"first": 20})
                        iterator := getNodeIteratorFn(node)
-                       rootQ, err := 
createRootQueue(map[string]string{"first": "20"})
+                       rootQ, err := createRootQueue(nil)
                        assert.NilError(t, err)
-                       parentQ, err := createManagedQueueGuaranteed(rootQ, 
"parent", true, map[string]string{"first": "20"}, tt.parentGuaranteed)
+                       parentQ, err := createManagedQueueGuaranteed(rootQ, 
"parent", true, map[string]string{}, tt.parentGuaranteed)
                        assert.NilError(t, err)
-                       childQ1, err := createManagedQueueGuaranteed(parentQ, 
"child1", false, map[string]string{"first": "10"}, map[string]string{"first": 
"5"})
+                       childQ1, err := createManagedQueueGuaranteed(parentQ, 
"child1", false, map[string]string{}, map[string]string{"first": "5"})
                        assert.NilError(t, err)
-                       childQ2, err := createManagedQueueGuaranteed(parentQ, 
"child2", false, map[string]string{"first": "10"}, tt.childGuaranteed)
+                       childQ2, err := createManagedQueueGuaranteed(parentQ, 
"child2", false, map[string]string{}, tt.childGuaranteed)
                        assert.NilError(t, err)
                        app1 := newApplication(appID1, "default", 
"root.parent.child1")
                        app1.SetQueue(childQ1)
@@ -227,8 +225,8 @@ func TestTryPreemption(t *testing.T) {
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, alloc1.IsPreempted(), "alloc1 preempted")
-       assert.Check(t, !alloc2.IsPreempted(), "alloc2 not preempted")
+       assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
 }
 
 // TestTryPreemptionOnNode Test try preemption on node with simple queue 
hierarchy. Since Node doesn't have enough resources to accomodate, preemption 
happens because of node resource constraint.
@@ -241,9 +239,8 @@ func TestTryPreemption(t *testing.T) {
 // root.parent.child2. Guaranteed set, first: 5. Ask of first:5 is waiting for 
resources.
 // 1 Allocation on root.parent.child1 should be preempted to free up resources 
for ask arrived in root.parent.child2.
 func TestTryPreemptionOnNode(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode("node1", map[string]resources.Quantity{"first": 5, 
"pods": 1})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 5, 
"pods": 1})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5, 
"pods": 1})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5, 
"pods": 1})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -262,10 +259,10 @@ func TestTryPreemptionOnNode(t *testing.T) {
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "pods": 
1}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node1", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -278,9 +275,18 @@ func TestTryPreemptionOnNode(t *testing.T) {
        headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 
3})
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
+       // register predicate handler
+       preemptions := []mock.Preemption{
+               mock.NewPreemption(true, "alloc3", nodeID2, []string{"alloc2"}, 
0, 0),
+       }
+       plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
+       plugins.RegisterSchedulerPlugin(plugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
+       assert.Equal(t, nodeID2, alloc.nodeID, "wrong node")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
 }
@@ -295,9 +301,8 @@ func TestTryPreemptionOnNode(t *testing.T) {
 // root.parent.child2. Guaranteed set, first: 5. Ask of first:5 is waiting for 
resources.
 // 1 Allocation on root.parent.child1 should be preempted to free up resources 
for ask arrived in root.parent.child2.
 func TestTryPreemptionOnQueue(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode("node1", map[string]resources.Quantity{"first": 10, 
"pods": 2})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 10, 
"pods": 2})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10, 
"pods": 2})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10, 
"pods": 2})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -316,10 +321,10 @@ func TestTryPreemptionOnQueue(t *testing.T) {
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5, "pods": 
1}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node1", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -332,9 +337,17 @@ func TestTryPreemptionOnQueue(t *testing.T) {
        headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods": 
3})
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
+       allocs := map[string]string{}
+       allocs["alloc3"] = nodeID2
+
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
+       plugins.RegisterSchedulerPlugin(plugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
+       assert.Equal(t, nodeID2, alloc.nodeID, "wrong node")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
 }
@@ -349,9 +362,8 @@ func TestTryPreemptionOnQueue(t *testing.T) {
 // root.parent.child2. Guaranteed set, first: 5. Ask of first:5 is waiting for 
resources.
 // 2 Allocation on root.parent.child1 has been found and considered as 
victims. Since victims total resource usage (first: 4) is lesser than ask 
requirment (first: 5), preemption won't help. Hence, victims are dropped.
 func TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode(node1, map[string]resources.Quantity{"first": 10, 
"pods": 2})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 10, 
"pods": 2})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10, 
"pods": 2})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10, 
"pods": 2})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(map[string]string{"first": "20", "pods": 
"5"})
        assert.NilError(t, err)
@@ -370,10 +382,10 @@ func 
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "pods": 
1}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node1", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -391,7 +403,7 @@ func 
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
        assert.Equal(t, ok, false, "no victims found")
        assert.Equal(t, alloc, expectedAlloc, "wrong alloc")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
-       assert.Check(t, !alloc2.IsPreempted(), "alloc2 not preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
 }
 
 // TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource Test try 
preemption on queue with simple queue hierarchy. Since Node doesn't have enough 
resources to accomodate, preemption happens because of node resource constraint.
@@ -404,9 +416,8 @@ func 
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
 // root.parent.child2. Guaranteed set, first: 5. Ask of first:5 is waiting for 
resources.
 // 2 Allocation on root.parent.child1 has been found and considered as 
victims. Since victims total resource usage (first: 4) is lesser than ask 
requirment (first: 5), preemption won't help. Hence, victims are dropped.
 func TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t 
*testing.T) {
-       t.SkipNow()
-       node1 := newNode(node1, map[string]resources.Quantity{"first": 5, 
"pods": 1})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 5, 
"pods": 1})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5, 
"pods": 1})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5, 
"pods": 1})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -425,10 +436,10 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "pods": 
1}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node2", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -443,8 +454,8 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
 
        // register predicate handler
        preemptions := []mock.Preemption{
-               mock.NewPreemption(true, "alloc3", "node1", []string{"alloc1"}, 
0, 0),
-               mock.NewPreemption(true, "alloc3", "node2", []string{"alloc2"}, 
0, 0),
+               mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"}, 
0, 0),
+               mock.NewPreemption(true, "alloc3", nodeID2, []string{"alloc2"}, 
0, 0),
        }
 
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
@@ -456,7 +467,7 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
        assert.Equal(t, ok, false, "no victims found")
        assert.Equal(t, alloc, expectedAlloc, "wrong alloc")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
-       assert.Check(t, !alloc2.IsPreempted(), "alloc2 not preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
 }
 
 // TestTryPreemption_VictimsAvailableOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node doesn't have enough resources to 
accomodate, preemption happens because of node resource constraint.
@@ -469,9 +480,8 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
 // root.parent.child2. Guaranteed set, first: 5. Ask of first:5 is waiting for 
resources.
 // 2 Allocation on root.parent.child1 has been found and considered as victims 
and preempted to free up resources for ask.
 func TestTryPreemption_VictimsAvailableOnDifferentNodes(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode(node1, map[string]resources.Quantity{"first": 5, 
"pods": 1})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 4, 
"pods": 1})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5, 
"pods": 1})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 4, 
"pods": 1})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -490,10 +500,10 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t 
*testing.T) {
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "pods": 
1}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node2", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -508,22 +518,22 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t 
*testing.T) {
 
        // register predicate handler
        preemptions := []mock.Preemption{
-               mock.NewPreemption(true, "alloc3", "node1", []string{"alloc1"}, 
0, 0),
-               mock.NewPreemption(true, "alloc3", "node2", []string{"alloc2"}, 
0, 0),
+               mock.NewPreemption(true, "alloc3", nodeID1, []string{"alloc1"}, 
0, 0),
+               mock.NewPreemption(true, "alloc3", nodeID2, []string{"alloc2"}, 
0, 0),
        }
-
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
 
        alloc, ok := preemptor.TryPreemption()
-       assert.Assert(t, ok, "no victims found")
-       assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, alloc1.IsPreempted(), "alloc1 preempted")
-       assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
+       var expectedAlloc *Allocation
+       assert.Equal(t, ok, false, "no victims found")
+       assert.Equal(t, alloc, expectedAlloc, "wrong alloc")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
+       assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
 }
 
-// TestTryPreemption_OnQueue_VictimsOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node has enough resources to 
accomodate, preemption happens because of queue resource constraint.
+// TestTryPreemption_OnQueue_VictimsOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node has enough resources to 
accomodate, preemption happens because of queue resource constraint.xw
 // Guaranteed and Max resource set on both victim queue path and preemptor 
queue paths. victim and preemptor queue are siblings.
 // Ask (Preemptor) resource type matches with all resource types of the 
victim. Guaranteed set only on that specific resource type.
 // Setup:
@@ -536,9 +546,8 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t 
*testing.T) {
 // option 1 >> option 2 >> option 3. In option 3, preempting third allocation 
is unnecessary, should avoid this option.
 // Either option 1 or option2 is fine, but not option 3.
 func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode(node1, map[string]resources.Quantity{"first": 30})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 30})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -559,10 +568,10 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t 
*testing.T) {
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        ask2.createTime = time.Now().Add(-1 * time.Minute)
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node2", ask2)
+       alloc2 := NewAllocation(nodeID2, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
 
@@ -577,7 +586,7 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t 
*testing.T) {
        ask4.createTime = time.Now()
        assert.NilError(t, app3.AddAllocationAsk(ask4))
 
-       alloc4 := NewAllocation("node2", ask4)
+       alloc4 := NewAllocation(nodeID2, ask4)
        app3.AddAllocation(alloc4)
        assert.Check(t, node2.AddAllocation(alloc4), "node alloc2 failed")
        assert.NilError(t, 
childQ3.IncAllocatedResource(ask4.GetAllocatedResource(), false))
@@ -591,7 +600,7 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t 
*testing.T) {
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
        allocs := map[string]string{}
-       allocs["alloc3"] = "node2"
+       allocs["alloc3"] = nodeID2
 
        plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
        plugins.RegisterSchedulerPlugin(plugin)
@@ -600,10 +609,10 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t 
*testing.T) {
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
-       assert.Equal(t, "node2", alloc.nodeID, "wrong alloc")
-       assert.Equal(t, "node1", alloc1.nodeID, "wrong alloc")
-       assert.Equal(t, "node2", alloc2.nodeID, "wrong alloc")
-       assert.Equal(t, "node2", alloc4.nodeID, "wrong alloc")
+       assert.Equal(t, nodeID2, alloc.nodeID, "wrong alloc")
+       assert.Equal(t, nodeID1, alloc1.nodeID, "wrong node")
+       assert.Equal(t, nodeID2, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID2, alloc4.nodeID, "wrong node")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
        assert.Check(t, alloc4.IsPreempted(), "alloc2 not preempted")
@@ -620,9 +629,8 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t 
*testing.T) {
 // root.parent.child3. Guaranteed not set. 1 Allocation is running on node2. 
Total usage is first:5.
 // High priority ask should not be touched and remaining 2 allocs should be 
preempted to free up resources
 func TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
-       t.SkipNow()
-       node1 := newNode(node1, map[string]resources.Quantity{"first": 30})
-       node2 := newNode("node2", map[string]resources.Quantity{"first": 30})
+       node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
+       node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
        iterator := getNodeIteratorFn(node1, node2)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -642,15 +650,15 @@ func 
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
        assert.NilError(t, app1.AddAllocationAsk(ask1))
 
        // High priority ask, should not be considered as victim
-       ask2 := newAllocationAskPriority("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 1000)
+       ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
        ask2.createTime = time.Now().Add(-1 * time.Minute)
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation("node1", ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node1.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation("node2", ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app1.AddAllocation(alloc2)
-       assert.Check(t, node2.AddAllocation(alloc2), "node alloc2 failed")
+       assert.Check(t, node1.AddAllocation(alloc2), "node alloc2 failed")
 
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask1.GetAllocatedResource(), false))
        assert.NilError(t, 
childQ1.IncAllocatedResource(ask2.GetAllocatedResource(), false))
@@ -659,11 +667,11 @@ func 
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
        app3.SetQueue(childQ3)
        childQ3.applications[appID3] = app3
 
-       ask4 := newAllocationAsk("alloc4", appID3, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       ask4 := newAllocationAskPriority("alloc4", appID3, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 1000)
        ask4.createTime = time.Now()
        assert.NilError(t, app3.AddAllocationAsk(ask4))
 
-       alloc4 := NewAllocation("node2", ask4)
+       alloc4 := NewAllocation(nodeID2, ask4)
        app3.AddAllocation(alloc4)
        assert.Check(t, node2.AddAllocation(alloc4), "node alloc2 failed")
        assert.NilError(t, 
childQ3.IncAllocatedResource(ask4.GetAllocatedResource(), false))
@@ -677,7 +685,7 @@ func 
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
        allocs := map[string]string{}
-       allocs["alloc3"] = "node2"
+       allocs["alloc3"] = nodeID1
 
        plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
        plugins.RegisterSchedulerPlugin(plugin)
@@ -686,13 +694,13 @@ func 
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
-       assert.Equal(t, "node2", alloc.nodeID, "wrong alloc")
-       assert.Equal(t, "node1", alloc1.nodeID, "wrong alloc")
-       assert.Equal(t, "node2", alloc2.nodeID, "wrong alloc")
-       assert.Equal(t, "node2", alloc4.nodeID, "wrong alloc")
-       assert.Check(t, alloc1.IsPreempted(), "alloc1 preempted")
-       assert.Check(t, !alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc4.IsPreempted(), "alloc2 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc1.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID2, alloc4.nodeID, "wrong node")
+       assert.Check(t, alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
+       assert.Check(t, !alloc4.IsPreempted(), "alloc4 preempted")
 }
 
 // TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide Test 
try preemption with 2 level queue hierarchy.
@@ -705,8 +713,7 @@ func 
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
 // root.parent.parent2.child3. No usage, no guaranteed set
 // 1 Allocation on root.parent.parent1.child2 should be preempted to free up 
resources for ask arrived in root.parent.parent1.child1.
 func TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 3, 
"mem": 400})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3, 
"mem": 400})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -733,10 +740,10 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 
200}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ2.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -749,9 +756,18 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
        headRoom := 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 2})
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
+       allocs := map[string]string{}
+       allocs["alloc3"] = nodeID1
+
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
+       plugins.RegisterSchedulerPlugin(plugin)
+       defer plugins.UnregisterSchedulerPlugins()
+
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
 }
@@ -766,8 +782,7 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
 // root.parent.parent2.child3. No usage, no guaranteed set
 // 1 Allocation on root.parent.parent1.child2 should be preempted to free up 
resources for ask arrived in root.parent.parent1.child1.
 func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 2, 
"mem": 400})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 2, 
"mem": 400})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -794,10 +809,10 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
        ask2 := newAllocationAsk("alloc2", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 
200}))
        ask2.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask2))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app1.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
        assert.NilError(t, 
childQ2.IncAllocatedResource(ask1.GetAllocatedResource(), false))
@@ -811,7 +826,7 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
        preemptor := NewPreemptor(app2, headRoom, 30*time.Second, ask3, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc3", 
node1, []string{"alloc2"}, 0, 0)}
+       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc3", 
nodeID1, []string{"alloc2"}, 0, 0)}
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
@@ -819,6 +834,8 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
        alloc, ok := preemptor.TryPreemption()
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc3", alloc.allocationKey, "wrong alloc")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
 }
@@ -835,8 +852,7 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
 // 3rd allocation of vcores:1, mem: 100 should not be touched as preempting 
the same would make usage goes below the guaranteed set on 
root.parent.parent2.child2.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSides(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 5, 
"mem": 700})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5, 
"mem": 700})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -868,7 +884,7 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"mem": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -882,13 +898,13 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 
100}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -907,10 +923,9 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
        allocs := map[string]string{}
-       allocs["alloc4"] = node1
-       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, preemptions)
+       allocs["alloc4"] = nodeID1
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
 
@@ -918,9 +933,12 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 // 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSides
 Test try preemption with 2 level queue hierarchy. Since Node doesn't have 
enough resources to accomodate, preemption happens because of node resource 
constraint.
@@ -935,8 +953,7 @@ func 
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
 // 3rd allocation of vcores:1, mem: 100 should not be touched as preempting 
the same would make usage goes below the guaranteed set on 
root.parent.parent2.child2.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSides(t
 *testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 3, 
"mem": 600})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3, 
"mem": 600})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -968,7 +985,7 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"mem": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -982,13 +999,13 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem": 
100}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -1007,7 +1024,7 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
+       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
nodeID1, []string{"alloc3", "alloc2"}, 1, 1)}
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
@@ -1016,9 +1033,12 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 // TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide Test try 
preemption with 2 level queue hierarchy.  Since Node doesn't have enough 
resources to accomodate, preemption happens because of node resource constraint.
@@ -1033,8 +1053,7 @@ func 
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
 // but last allocation should not be touched as preempting the same would make 
usage goes above the guaranteed set on preemptor or ask queue 
root.parent.parent2.child1.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 5, 
"gpu": 300, "mem": 200})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5, 
"gpu": 300, "mem": 200})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -1066,7 +1085,7 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -1080,13 +1099,13 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -1105,10 +1124,9 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
        allocs := map[string]string{}
-       allocs["alloc4"] = node1
-       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, preemptions)
+       allocs["alloc4"] = nodeID1
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
 
@@ -1116,9 +1134,12 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 // TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide Test 
try preemption with 2 level queue hierarchy. Since Node doesn't have enough 
resources to accomodate, preemption happens because of node resource constraint.
@@ -1133,8 +1154,7 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
 // but last allocation should not be touched as preempting the same would make 
usage goes above the guaranteed set on preemptor or ask queue 
root.parent.parent2.child1.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 3, 
"gpu": 300, "mem": 200})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3, 
"gpu": 300, "mem": 200})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -1166,7 +1186,7 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -1180,13 +1200,13 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -1205,7 +1225,7 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
+       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
nodeID1, []string{"alloc3", "alloc2"}, 1, 1)}
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
@@ -1214,9 +1234,12 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 // TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides 
Test try preemption with 2 level queue hierarchy.
@@ -1231,8 +1254,7 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
 // 3rd allocation of vcores:1 should not be touched as preempting the same 
would make usage goes below the guaranteed set on root.parent.parent2.child2.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t 
*testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 5, 
"gpu": 700, "mem": 200})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5, 
"gpu": 700, "mem": 200})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -1264,7 +1286,7 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -1278,13 +1300,13 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -1304,10 +1326,9 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
        allocs := map[string]string{}
-       allocs["alloc4"] = node1
-       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, preemptions)
+       allocs["alloc4"] = nodeID1
+       plugin := mock.NewPreemptionPredicatePlugin(nil, allocs, nil)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
 
@@ -1315,9 +1336,12 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 // 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides 
Test try preemption with 2 level queue hierarchy. Since Node doesn't have 
enough resources to accomodate, preemption happens because of node resource 
constraint.
@@ -1332,8 +1356,7 @@ func 
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
 // 3rd allocation of vcores:1 should not be touched as preempting the same 
would make usage goes below the guaranteed set on root.parent.parent2.child2.
 // All remaining three allocation of each mem: 100 should not be touched at 
all as there is no matching resource type between these allocs and ask resource 
types.
 func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
 *testing.T) {
-       t.SkipNow()
-       node := newNode(node1, map[string]resources.Quantity{"vcores": 3, 
"gpu": 700, "mem": 200})
+       node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3, 
"gpu": 700, "mem": 200})
        iterator := getNodeIteratorFn(node)
        rootQ, err := createRootQueue(nil)
        assert.NilError(t, err)
@@ -1365,7 +1388,7 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorS
                askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
                askN.createTime = time.Now().Add(-2 * time.Minute)
                assert.NilError(t, app1.AddAllocationAsk(askN))
-               allocN := NewAllocation(node1, askN)
+               allocN := NewAllocation(nodeID1, askN)
                app1.AddAllocation(allocN)
                assert.Check(t, node.AddAllocation(allocN), "node alloc1 
failed")
        }
@@ -1379,13 +1402,13 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorS
        ask3 := newAllocationAsk("alloc3", appID2, 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
        ask3.createTime = time.Now()
        assert.NilError(t, app1.AddAllocationAsk(ask3))
-       alloc1 := NewAllocation(node1, ask1)
+       alloc1 := NewAllocation(nodeID1, ask1)
        app1.AddAllocation(alloc1)
        assert.Check(t, node.AddAllocation(alloc1), "node alloc1 failed")
-       alloc2 := NewAllocation(node1, ask2)
+       alloc2 := NewAllocation(nodeID1, ask2)
        app2.AddAllocation(alloc2)
        assert.Check(t, node.AddAllocation(alloc2), "node alloc2 failed")
-       alloc3 := NewAllocation(node1, ask3)
+       alloc3 := NewAllocation(nodeID1, ask3)
        app3.AddAllocation(alloc3)
        assert.Check(t, node.AddAllocation(alloc3), "node alloc3 failed")
 
@@ -1405,7 +1428,7 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorS
        preemptor := NewPreemptor(app4, headRoom, 30*time.Second, ask4, 
iterator(), false)
 
        // register predicate handler
-       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
node1, []string{"alloc3", "alloc2"}, 1, 1)}
+       preemptions := []mock.Preemption{mock.NewPreemption(true, "alloc4", 
nodeID1, []string{"alloc3", "alloc2"}, 1, 1)}
        plugin := mock.NewPreemptionPredicatePlugin(nil, nil, preemptions)
        plugins.RegisterSchedulerPlugin(plugin)
        defer plugins.UnregisterSchedulerPlugins()
@@ -1414,9 +1437,12 @@ func 
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorS
        assert.NilError(t, plugin.GetPredicateError())
        assert.Assert(t, ok, "no victims found")
        assert.Equal(t, "alloc4", alloc.allocationKey, "wrong alloc")
-       assert.Check(t, !alloc1.IsPreempted(), "alloc1 not preempted")
+       assert.Equal(t, nodeID1, alloc.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc2.nodeID, "wrong node")
+       assert.Equal(t, nodeID1, alloc3.nodeID, "wrong node")
+       assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, alloc2.IsPreempted(), "alloc2 not preempted")
-       assert.Check(t, alloc3.IsPreempted(), "alloc3 preempted")
+       assert.Check(t, alloc3.IsPreempted(), "alloc3 not preempted")
 }
 
 func TestSolutionScoring(t *testing.T) {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 9ae65ce1..ff4957e7 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1764,17 +1764,22 @@ func (sq *Queue) findEligiblePreemptionVictims(results 
map[string]*QueuePreempti
                        return
                }
 
+               victims := sq.createPreemptionSnapshot(results)
+
                // skip this queue if we are within guaranteed limits
-               guaranteed := 
resources.ComponentWiseMinPermissive(sq.GetActualGuaranteedResource(), 
sq.GetMaxResource())
-               if guaranteed.FitInMaxUndef(sq.GetAllocatedResource()) {
+               remaining := 
results[sq.QueuePath].GetRemainingGuaranteedResource()
+               if remaining != nil && 
resources.StrictlyGreaterThanOrEquals(remaining, resources.Zero) {
                        return
                }
 
-               victims := sq.createPreemptionSnapshot(results)
-
                // walk allocations and select those that are equal or lower 
than current priority
                for _, app := range sq.GetCopyOfApps() {
                        for _, alloc := range app.GetAllAllocations() {
+                               // at least any one of the ask resource type 
should match with potential victim
+                               if 
!ask.GetAllocatedResource().MatchAny(alloc.allocatedResource) {
+                                       continue
+                               }
+
                                // skip tasks which require a specific node
                                if alloc.GetAsk().GetRequiredNode() != "" {
                                        continue
@@ -1840,7 +1845,7 @@ func (sq *Queue) findPreemptionFenceRoot(priorityMap 
map[string]int64, currentPr
        priorityMap[sq.QueuePath] = currentPriority
 
        // Return this queue as fence root if: 1. FencePreemptionPolicy is set 
2. root queue 3. allocations in the queue reached maximum resources
-       if sq.parent == nil || sq.GetPreemptionPolicy() == 
policies.FencePreemptionPolicy || resources.Equals(sq.allocatedResource, 
sq.maxResource) {
+       if sq.parent == nil || sq.GetPreemptionPolicy() == 
policies.FencePreemptionPolicy || resources.Equals(sq.maxResource, 
sq.allocatedResource) {
                return sq
        }
        return sq.parent.findPreemptionFenceRoot(priorityMap, currentPriority)
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 318b4771..7cc13d64 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1697,7 +1697,7 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
 
        // verify no victims when no allocations exist
        snapshot := leaf1.FindEligiblePreemptionVictims(leaf1.QueuePath, ask)
-       assert.Equal(t, 3, len(snapshot), "wrong snapshot count") // leaf1, 
parent1, root
+       assert.Equal(t, 5, len(snapshot), "wrong snapshot count") // leaf1, 
parent1, root
        assert.Equal(t, 0, len(victims(snapshot)), "found victims")
 
        // add two lower-priority allocs in leaf2
diff --git a/pkg/scheduler/objects/utilities_test.go 
b/pkg/scheduler/objects/utilities_test.go
index bc751553..11384b86 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -39,6 +39,7 @@ const (
        appID0    = "app-0"
        appID1    = "app-1"
        appID2    = "app-2"
+       appID3    = "app-3"
        aKey      = "alloc-1"
        aKey2     = "alloc-2"
        nodeID1   = "node-1"


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org


Reply via email to