This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push: new e17eafaa [YUNIKORN-2458] Remove ask repeats from AllocationAsk (#854) e17eafaa is described below commit e17eafaab1c80eb15c668cc4241441305a1282f7 Author: Craig Condit <ccon...@apache.org> AuthorDate: Wed Apr 24 09:55:21 2024 -0500 [YUNIKORN-2458] Remove ask repeats from AllocationAsk (#854) Simplify ask and allocation handling by removing support for repeated requests in a single ask. This is functionality that is not used by the shim. By removing support for repeated asks, we also ensure that there is a 1:1 relationship between ask and allocation. Closes: #854 --- go.mod | 2 +- go.sum | 4 +- pkg/examples/simple_example.go | 3 +- pkg/scheduler/health_checker_test.go | 2 +- pkg/scheduler/objects/allocation.go | 5 +- pkg/scheduler/objects/allocation_ask.go | 47 ++-- pkg/scheduler/objects/allocation_ask_test.go | 47 ++-- pkg/scheduler/objects/allocation_test.go | 6 +- pkg/scheduler/objects/application.go | 136 ++++++----- pkg/scheduler/objects/application_test.go | 184 +++++++------- pkg/scheduler/objects/queue_test.go | 11 +- .../objects/required_node_preemptor_test.go | 1 - pkg/scheduler/objects/utilities_test.go | 30 +-- pkg/scheduler/partition.go | 4 +- pkg/scheduler/partition_test.go | 126 +++++----- pkg/scheduler/scheduler_test.go | 21 +- pkg/scheduler/tests/application_tracking_test.go | 3 +- pkg/scheduler/tests/mockscheduler_test.go | 19 +- pkg/scheduler/tests/operation_test.go | 26 +- pkg/scheduler/tests/performance_test.go | 53 +++-- pkg/scheduler/tests/plugin_test.go | 6 +- pkg/scheduler/tests/recovery_test.go | 90 ++++--- pkg/scheduler/tests/smoke_test.go | 263 +++++++++++---------- pkg/scheduler/utilities_test.go | 44 ++-- pkg/webservice/dao/allocation_ask_info.go | 1 - pkg/webservice/handlers.go | 3 +- pkg/webservice/handlers_test.go | 33 ++- 27 files changed, 597 insertions(+), 573 deletions(-) diff --git a/go.mod b/go.mod index 39491005..7865af82 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core go 1.21 require ( - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240422062544-b70081933c38 + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 85f36706..a76cb0cd 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240422062544-b70081933c38 h1:/02cjuc0xpQPZIGezL45QZ6muGI7dfesu9l38U9fbx0= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240422062544-b70081933c38/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a h1:H978zsTL2FvbRFnySO83DOFLO33PwHWFdmHvMoSVXsc= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/pkg/examples/simple_example.go b/pkg/examples/simple_example.go index 590cf6fd..7cfaf864 100644 --- a/pkg/examples/simple_example.go +++ b/pkg/examples/simple_example.go @@ -218,8 +218,7 @@ partitions: "vcore": {Value: 1}, }, }, - MaxAllocations: 20, - ApplicationID: "app-1", + ApplicationID: "app-1", }, }, RmID: "rm:123", diff --git a/pkg/scheduler/health_checker_test.go b/pkg/scheduler/health_checker_test.go index a3f97f2a..23fc8854 100644 --- a/pkg/scheduler/health_checker_test.go +++ b/pkg/scheduler/health_checker_test.go @@ -216,7 +216,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) { assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan allocation check on the node should be successful") // remove the allocation from the node, so we will have an orphan allocation assigned to the app - node.RemoveAllocation("key-0") + node.RemoveAllocation("key") healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext) assert.Assert(t, healthInfo.HealthChecks[9].Succeeded, "The orphan allocation check on the node should be successful") assert.Assert(t, !healthInfo.HealthChecks[10].Succeeded, "The orphan allocation check on the app should not be successful") diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index 8812a891..ddca67f7 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -92,7 +92,7 @@ func NewAllocation(nodeID string, ask *AllocationAsk) *Allocation { bindTime: time.Now(), nodeID: nodeID, partitionName: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()), - allocationID: ask.allocationKey + "-" + strconv.Itoa(ask.completedPendingAsk()), + allocationID: ask.allocationKey, tags: ask.GetTagsClone(), priority: ask.GetPriority(), allocatedResource: ask.GetAllocatedResource().Clone(), @@ -147,8 +147,7 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation { allocatedResource: resources.NewResourceFromProto(alloc.ResourcePerAlloc), tags: CloneAllocationTags(alloc.AllocationTags), priority: alloc.Priority, - pendingAskRepeat: 0, - maxAllocations: 1, + allocated: true, taskGroupName: alloc.TaskGroupName, placeholder: alloc.Placeholder, createTime: time.Unix(creationTime, 0), diff --git a/pkg/scheduler/objects/allocation_ask.go b/pkg/scheduler/objects/allocation_ask.go index 6b8bce6e..9611b9f0 100644 --- a/pkg/scheduler/objects/allocation_ask.go +++ b/pkg/scheduler/objects/allocation_ask.go @@ -42,7 +42,6 @@ type AllocationAsk struct { execTimeout time.Duration // execTimeout for the allocation ask createTime time.Time // the time this ask was created (used in reservations) priority int32 - maxAllocations int32 requiredNode string allowPreemptSelf bool allowPreemptOther bool @@ -52,7 +51,7 @@ type AllocationAsk struct { resKeyWithoutNode string // the reservation key without node // Mutable fields which need protection - pendingAskRepeat int32 + allocated bool allocLog map[string]*AllocationLogEntry preemptionTriggered bool preemptCheckTime time.Time @@ -90,8 +89,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) *AllocationAsk { saa := &AllocationAsk{ allocationKey: ask.AllocationKey, allocatedResource: resources.NewResourceFromProto(ask.ResourceAsk), - pendingAskRepeat: ask.MaxAllocations, - maxAllocations: ask.MaxAllocations, applicationID: ask.ApplicationID, partitionName: ask.PartitionName, @@ -124,7 +121,7 @@ func (aa *AllocationAsk) String() string { if aa == nil { return "ask is nil" } - return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, PendingRepeats %d", aa.allocationKey, aa.applicationID, aa.allocatedResource, aa.GetPendingAskRepeat()) + return fmt.Sprintf("allocationKey %s, applicationID %s, Resource %s, Allocated %t", aa.allocationKey, aa.applicationID, aa.allocatedResource, aa.IsAllocated()) } // GetAllocationKey returns the allocation key for this ask @@ -142,26 +139,35 @@ func (aa *AllocationAsk) GetPartitionName() string { return aa.partitionName } -// updatePendingAskRepeat updates the pending ask repeat with the delta given. -// Update the pending ask repeat counter with the delta (pos or neg). The pending repeat is always 0 or higher. -// If the update would cause the repeat to go negative the update is discarded and false is returned. -// In all other cases the repeat is updated and true is returned. -func (aa *AllocationAsk) updatePendingAskRepeat(delta int32) bool { +// allocate marks the ask as allocated and returns true if successful. An ask may not be allocated multiple times. +func (aa *AllocationAsk) allocate() bool { aa.Lock() defer aa.Unlock() - if aa.pendingAskRepeat+delta >= 0 { - aa.pendingAskRepeat += delta - return true + if aa.allocated { + return false } - return false + aa.allocated = true + return true } -// GetPendingAskRepeat gets the number of repeat asks remaining -func (aa *AllocationAsk) GetPendingAskRepeat() int32 { +// deallocate marks the ask as pending and returns true if successful. An ask may not be deallocated multiple times. +func (aa *AllocationAsk) deallocate() bool { + aa.Lock() + defer aa.Unlock() + + if !aa.allocated { + return false + } + aa.allocated = false + return true +} + +// IsAllocated determines if this ask has been allocated yet +func (aa *AllocationAsk) IsAllocated() bool { aa.RLock() defer aa.RUnlock() - return aa.pendingAskRepeat + return aa.allocated } // GetCreateTime returns the time this ask was created @@ -336,13 +342,6 @@ func (aa *AllocationAsk) HasTriggeredScaleUp() bool { return aa.scaleUpTriggered } -// completedPendingAsk How many pending asks has been completed or processed so far? -func (aa *AllocationAsk) completedPendingAsk() int { - aa.RLock() - defer aa.RUnlock() - return int(aa.maxAllocations - aa.pendingAskRepeat) -} - func (aa *AllocationAsk) setReservationKeyForNode(node, resKey string) { aa.Lock() defer aa.Unlock() diff --git a/pkg/scheduler/objects/allocation_ask_test.go b/pkg/scheduler/objects/allocation_ask_test.go index 59bc47da..8c913ee0 100644 --- a/pkg/scheduler/objects/allocation_ask_test.go +++ b/pkg/scheduler/objects/allocation_ask_test.go @@ -46,43 +46,36 @@ func TestAskToString(t *testing.T) { func TestNewAsk(t *testing.T) { res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) siAsk := &si.AllocationAsk{ - AllocationKey: "ask-1", - ApplicationID: "app-1", - MaxAllocations: 1, - ResourceAsk: res.ToProto(), + AllocationKey: "ask-1", + ApplicationID: "app-1", + ResourceAsk: res.ToProto(), } ask := NewAllocationAskFromSI(siAsk) if ask == nil { t.Fatal("NewAllocationAskFromSI create failed while it should not") } askStr := ask.String() - expected := "allocationKey ask-1, applicationID app-1, Resource map[first:10], PendingRepeats 1" + expected := "allocationKey ask-1, applicationID app-1, Resource map[first:10], Allocated false" assert.Equal(t, askStr, expected, "Strings should have been equal") assert.Equal(t, "app-1|ask-1", ask.resKeyWithoutNode) //nolint:staticcheck } -func TestPendingAskRepeat(t *testing.T) { +func TestAskAllocateDeallocate(t *testing.T) { res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) ask := newAllocationAsk("alloc-1", "app-1", res) - assert.Equal(t, ask.GetPendingAskRepeat(), int32(1), "pending ask repeat should be 1") - if !ask.updatePendingAskRepeat(1) { - t.Errorf("increase of pending ask with 1 failed, expected repeat 2, current repeat: %d", ask.GetPendingAskRepeat()) - } - if !ask.updatePendingAskRepeat(-1) { - t.Errorf("decrease of pending ask with 1 failed, expected repeat 1, current repeat: %d", ask.GetPendingAskRepeat()) - } - if ask.updatePendingAskRepeat(-2) { - t.Errorf("decrease of pending ask with 2 did not fail, expected repeat 1, current repeat: %d", ask.GetPendingAskRepeat()) - } - if !ask.updatePendingAskRepeat(-1) { - t.Errorf("decrease of pending ask with 1 failed, expected repeat 0, current repeat: %d", ask.GetPendingAskRepeat()) - } + assert.Assert(t, !ask.IsAllocated(), "pending ask should return false for IsAllocated()") + assert.Assert(t, !ask.deallocate(), "attempt to deallocate pending ask should fail") + assert.Assert(t, ask.allocate(), "attempt to allocate pending ask should not fail") + assert.Assert(t, ask.IsAllocated(), "allocated ask should return true for IsAllocated()") + assert.Assert(t, !ask.allocate(), "attempt to allocate previously allocated ask should fail") + assert.Assert(t, ask.deallocate(), "deallocating previously allocated ask should succeed") + assert.Assert(t, !ask.IsAllocated(), "deallocated ask should return false for IsAllocated()") } // the create time should not be manipulated but we need it for reservation testing func TestGetCreateTime(t *testing.T) { res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) - ask := newAllocationAskRepeat("alloc-1", "app-1", res, 2) + ask := newAllocationAsk("alloc-1", "app-1", res) created := ask.GetCreateTime() // move time 10 seconds back ask.createTime = created.Add(time.Second * -10) @@ -192,10 +185,9 @@ func TestGetRequiredNode(t *testing.T) { func TestAllocationLog(t *testing.T) { res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) siAsk := &si.AllocationAsk{ - AllocationKey: "ask-1", - ApplicationID: "app-1", - MaxAllocations: 1, - ResourceAsk: res.ToProto(), + AllocationKey: "ask-1", + ApplicationID: "app-1", + ResourceAsk: res.ToProto(), } ask := NewAllocationAskFromSI(siAsk) @@ -233,10 +225,9 @@ func TestAllocationLog(t *testing.T) { func TestSendPredicateFailed(t *testing.T) { res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) siAsk := &si.AllocationAsk{ - AllocationKey: "ask-1", - ApplicationID: "app-1", - MaxAllocations: 1, - ResourceAsk: res.ToProto(), + AllocationKey: "ask-1", + ApplicationID: "app-1", + ResourceAsk: res.ToProto(), } ask := NewAllocationAskFromSI(siAsk) eventSystem := mock.NewEventSystemDisabled() diff --git a/pkg/scheduler/objects/allocation_test.go b/pkg/scheduler/objects/allocation_test.go index 4d92ebca..e1c1bfe4 100644 --- a/pkg/scheduler/objects/allocation_test.go +++ b/pkg/scheduler/objects/allocation_test.go @@ -53,7 +53,7 @@ func TestNewAlloc(t *testing.T) { if alloc == nil { t.Fatal("NewAllocation create failed while it should not") } - assert.Equal(t, alloc.GetAllocationID(), "ask-1-0") + assert.Equal(t, alloc.GetAllocationID(), "ask-1") assert.Equal(t, alloc.GetResult(), Allocated, "New alloc should default to result Allocated") assert.Assert(t, resources.Equals(alloc.GetAllocatedResource(), res), "Allocated resource not set correctly") assert.Assert(t, !alloc.IsPlaceholder(), "ask should not have been a placeholder") @@ -62,7 +62,7 @@ func TestNewAlloc(t *testing.T) { alloc.SetInstanceType(instType1) assert.Equal(t, alloc.GetInstanceType(), instType1, "Instance type not set as expected") allocStr := alloc.String() - expected := "applicationID=app-1, allocationID=ask-1-0, allocationKey=ask-1, Node=node-1, result=Allocated" + expected := "applicationID=app-1, allocationID=ask-1, allocationKey=ask-1, Node=node-1, result=Allocated" assert.Equal(t, allocStr, expected, "Strings should have been equal") assert.Assert(t, !alloc.IsPlaceholderUsed(), fmt.Sprintf("Alloc should not be placeholder replacement by default: got %t, expected %t", alloc.IsPlaceholderUsed(), false)) created := alloc.GetCreateTime() @@ -131,7 +131,7 @@ func TestSIFromAlloc(t *testing.T) { assert.NilError(t, err, "Resource creation failed") expectedSI := &si.Allocation{ AllocationKey: "ask-1", - AllocationID: "ask-1-0", + AllocationID: "ask-1", NodeID: "node-1", ApplicationID: "app-1", ResourcePerAlloc: res.ToProto(), diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index cba48664..3c0e0179 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -576,8 +576,10 @@ func (sa *Application) removeAsksInternal(allocKey string, detail si.EventRecord toRelease += releases } if ask := sa.requests[allocKey]; ask != nil { - deltaPendingResource = resources.MultiplyBy(ask.GetAllocatedResource(), float64(ask.GetPendingAskRepeat())) - sa.pending = resources.Sub(sa.pending, deltaPendingResource) + if !ask.IsAllocated() { + deltaPendingResource = ask.GetAllocatedResource() + sa.pending = resources.Sub(sa.pending, deltaPendingResource) + } delete(sa.requests, allocKey) sa.sortedRequests.remove(ask) sa.appEvents.sendRemoveAskEvent(ask, detail) @@ -618,14 +620,14 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error { if ask == nil { return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID) } - if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.GetAllocatedResource()) { + if ask.IsAllocated() || resources.IsZero(ask.GetAllocatedResource()) { return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask) } - delta := resources.Multiply(ask.GetAllocatedResource(), int64(ask.GetPendingAskRepeat())) + delta := ask.GetAllocatedResource().Clone() var oldAskResource *resources.Resource = nil - if oldAsk := sa.requests[ask.GetAllocationKey()]; oldAsk != nil { - oldAskResource = resources.Multiply(oldAsk.GetAllocatedResource(), int64(oldAsk.GetPendingAskRepeat())) + if oldAsk := sa.requests[ask.GetAllocationKey()]; oldAsk != nil && !oldAsk.IsAllocated() { + oldAskResource = oldAsk.GetAllocatedResource().Clone() } // Check if we need to change state based on the ask added, there are two cases: @@ -683,9 +685,9 @@ func (sa *Application) addAllocationAskInternal(ask *AllocationAsk) { sa.requests[ask.GetAllocationKey()] = ask // update app priority - repeat := ask.GetPendingAskRepeat() + allocated := ask.IsAllocated() priority := ask.GetPriority() - if repeat > 0 && priority > sa.askMaxPriority { + if !allocated && priority > sa.askMaxPriority { sa.askMaxPriority = priority sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority) } @@ -695,44 +697,60 @@ func (sa *Application) addAllocationAskInternal(ask *AllocationAsk) { } } -func (sa *Application) UpdateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) { +func (sa *Application) AllocateAsk(allocKey string) (*resources.Resource, error) { sa.Lock() defer sa.Unlock() if ask := sa.requests[allocKey]; ask != nil { + return sa.allocateAsk(ask) + } + return nil, fmt.Errorf("failed to locate ask with key %s", allocKey) +} - return sa.updateAskRepeatInternal(ask, delta) +func (sa *Application) DeallocateAsk(allocKey string) (*resources.Resource, error) { + sa.Lock() + defer sa.Unlock() + if ask := sa.requests[allocKey]; ask != nil { + return sa.deallocateAsk(ask) } return nil, fmt.Errorf("failed to locate ask with key %s", allocKey) } -func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) { - // updating with delta does error checking internally - if !ask.updatePendingAskRepeat(delta) { - return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.GetAllocationKey(), sa.ApplicationID) +func (sa *Application) allocateAsk(ask *AllocationAsk) (*resources.Resource, error) { + if !ask.allocate() { + return nil, fmt.Errorf("unable to allocate previously allocated ask %s on app %s", ask.GetAllocationKey(), sa.ApplicationID) + } + + if ask.GetPriority() >= sa.askMaxPriority { + // recalculate downward + sa.updateAskMaxPriority() + } + + delta := resources.Multiply(ask.GetAllocatedResource(), -1) + sa.pending = resources.Add(sa.pending, delta) + // update the pending of the queue with the same delta + sa.queue.incPendingResource(delta) + + return delta, nil +} + +func (sa *Application) deallocateAsk(ask *AllocationAsk) (*resources.Resource, error) { + if !ask.deallocate() { + return nil, fmt.Errorf("unable to deallocate pending ask %s on app %s", ask.GetAllocationKey(), sa.ApplicationID) } askPriority := ask.GetPriority() - if ask.GetPendingAskRepeat() == 0 { - // ask removed - if askPriority >= sa.askMaxPriority { - // recalculate downward - sa.updateAskMaxPriority() - } - } else { - // ask added - if askPriority > sa.askMaxPriority { - // increase app priority - sa.askMaxPriority = askPriority - sa.queue.UpdateApplicationPriority(sa.ApplicationID, askPriority) - } + if askPriority > sa.askMaxPriority { + // increase app priority + sa.askMaxPriority = askPriority + sa.queue.UpdateApplicationPriority(sa.ApplicationID, askPriority) } - deltaPendingResource := resources.Multiply(ask.GetAllocatedResource(), int64(delta)) - sa.pending = resources.Add(sa.pending, deltaPendingResource) + delta := ask.GetAllocatedResource() + sa.pending = resources.Add(sa.pending, delta) // update the pending of the queue with the same delta - sa.queue.incPendingResource(deltaPendingResource) + sa.queue.incPendingResource(delta) - return deltaPendingResource, nil + return delta, nil } // HasReserved returns true if the application has any reservations. @@ -789,7 +807,11 @@ func (sa *Application) reserveInternal(node *Node, ask *AllocationAsk) error { return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID) } if !sa.canAskReserve(ask) { - return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat()) + if ask.IsAllocated() { + return fmt.Errorf("ask is already allocated") + } else { + return fmt.Errorf("ask is already reserved") + } } // check if we can reserve the node before reserving on the app if err := node.Reserve(sa, ask); err != nil { @@ -869,20 +891,21 @@ func (sa *Application) GetAskReservations(allocKey string) []string { return reservationKeys } -// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set -// larger than 1. It can never reserve more than the repeat number of nodes. +// Check if the allocation has already been reserved. An ask can never reserve more than one node. // No locking must be called while holding the lock func (sa *Application) canAskReserve(ask *AllocationAsk) bool { allocKey := ask.GetAllocationKey() - pending := int(ask.GetPendingAskRepeat()) - resNumber := sa.GetAskReservations(allocKey) - if len(resNumber) >= pending { - log.Log(log.SchedApplication).Debug("reservation exceeds repeats", - zap.String("askKey", allocKey), - zap.Int("askPending", pending), - zap.Int("askReserved", len(resNumber))) + if ask.IsAllocated() { + log.Log(log.SchedApplication).Debug("ask already allocated, no reservation allowed", + zap.String("askKey", allocKey)) + return false } - return pending > len(resNumber) + if len(sa.GetAskReservations(allocKey)) > 0 { + log.Log(log.SchedApplication).Debug("reservation already exists", + zap.String("askKey", allocKey)) + return false + } + return true } func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, userHeadRoom *resources.Resource, total *[]*AllocationAsk) { @@ -892,7 +915,7 @@ func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, user return } for _, request := range sa.sortedRequests { - if request.GetPendingAskRepeat() == 0 || !request.IsSchedulingAttempted() { + if request.IsAllocated() || !request.IsSchedulingAttempted() { continue } @@ -933,7 +956,7 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption userHeadroom := ugm.GetUserManager().Headroom(sa.queuePath, sa.ApplicationID, sa.user) // get all the requests from the app sorted in order for _, request := range sa.sortedRequests { - if request.GetPendingAskRepeat() == 0 { + if request.IsAllocated() { continue } // check if there is a replacement possible @@ -1097,7 +1120,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, for _, request := range sa.sortedRequests { // skip placeholders they follow standard allocation // this should also be part of a task group just make sure it is - if request.IsPlaceholder() || request.GetTaskGroup() == "" || request.GetPendingAskRepeat() == 0 { + if request.IsPlaceholder() || request.GetTaskGroup() == "" || request.IsAllocated() { continue } // walk over the placeholders, allow for processing all as we can have multiple task groups @@ -1144,9 +1167,9 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, alloc.SetResult(Replaced) // mark placeholder as released ph.SetReleased(true) - _, err := sa.updateAskRepeatInternal(request, -1) + _, err := sa.allocateAsk(request) if err != nil { - log.Log(log.SchedApplication).Warn("ask repeat update failed unexpectedly", + log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly", zap.Error(err)) } return alloc @@ -1195,9 +1218,9 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, zap.Stringer("placeholder", phFit)) return false } - _, err := sa.updateAskRepeatInternal(reqFit, -1) + _, err := sa.allocateAsk(reqFit) if err != nil { - log.Log(log.SchedApplication).Warn("ask repeat update failed unexpectedly", + log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly", zap.Error(err)) } @@ -1226,7 +1249,7 @@ func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIte for _, reserve := range sa.reservations { ask := sa.requests[reserve.askKey] // sanity check and cleanup if needed - if ask == nil || ask.GetPendingAskRepeat() == 0 { + if ask == nil || ask.IsAllocated() { var unreserveAsk *AllocationAsk // if the ask was not found we need to construct one to unreserve if ask == nil { @@ -1372,7 +1395,7 @@ func (sa *Application) tryNodes(ask *AllocationAsk, iterator NodeIterator) *Allo // check if the ask is reserved or not allocKey := ask.GetAllocationKey() reservedAsks := sa.GetAskReservations(allocKey) - allowReserve := len(reservedAsks) < int(ask.GetPendingAskRepeat()) + allowReserve := !ask.IsAllocated() && len(reservedAsks) == 0 var allocResult *Allocation iterator.ForEachNode(func(node *Node) bool { // skip the node if the node is not valid for the ask @@ -1450,8 +1473,7 @@ func (sa *Application) tryNodes(ask *AllocationAsk, iterator NodeIterator) *Allo zap.String("appID", sa.ApplicationID), zap.String("nodeID", nodeToReserve.NodeID), zap.String("allocationKey", allocKey), - zap.Int("reservations", len(reservedAsks)), - zap.Int32("pendingRepeats", ask.GetPendingAskRepeat())) + zap.Int("reservations", len(reservedAsks))) // skip the node if conditions can not be satisfied if !nodeToReserve.preReserveConditions(ask) { return nil @@ -1487,10 +1509,10 @@ func (sa *Application) tryNode(node *Node, ask *AllocationAsk) *Allocation { node.RemoveAllocation(alloc.GetAllocationID()) return nil } - // mark this ask as allocated by lowering the repeat - _, err := sa.updateAskRepeatInternal(ask, -1) + // mark this ask as allocated + _, err := sa.allocateAsk(ask) if err != nil { - log.Log(log.SchedApplication).Warn("ask repeat update failed unexpectedly", + log.Log(log.SchedApplication).Warn("allocation of ask failed unexpectedly", zap.Error(err)) } // all is OK, last update for the app @@ -1816,7 +1838,7 @@ func (sa *Application) removeAllocationInternal(allocationID string, releaseType func (sa *Application) updateAskMaxPriority() { value := configs.MinPriority for _, v := range sa.requests { - if v.GetPendingAskRepeat() == 0 { + if v.IsAllocated() { continue } value = max(value, v.GetPriority()) diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 75e6da5a..83984d34 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -272,11 +272,14 @@ func TestAppAllocReservation(t *testing.T) { // reserve 1 allocate ask res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 2) + ask := newAllocationAsk(aKey, appID1, res) + ask2 := newAllocationAsk(aKey2, appID1, res) node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10}) // reserve that works err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") + err = app.AddAllocationAsk(ask2) + assert.NilError(t, err, "ask2 should have been added to app") err = app.Reserve(node1, ask) assert.NilError(t, err, "reservation should not have failed") if len(app.GetAskReservations("")) != 0 { @@ -290,17 +293,17 @@ func TestAppAllocReservation(t *testing.T) { nodeID2 := "node-2" node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10}) - err = app.Reserve(node2, ask) + err = app.Reserve(node2, ask2) assert.NilError(t, err, "reservation should not have failed: error %v", err) - nodeKey2 := nodeID2 + "|" + aKey - askReserved = app.GetAskReservations(aKey) - if len(askReserved) != 2 && (askReserved[0] != nodeKey2 || askReserved[1] != nodeKey2) { + nodeKey2 := nodeID2 + "|" + aKey2 + askReserved = app.GetAskReservations(aKey2) + if len(askReserved) != 1 && askReserved[0] != nodeKey2 { t.Errorf("app should have reservations for %s on %s and has not", aKey, nodeID2) } - // check exceeding ask repeat: nothing should change + // check duplicate reserve: nothing should change if app.canAskReserve(ask) { - t.Error("ask has maximum repeats reserved, reserve check should have failed") + t.Error("ask has already reserved, reserve check should have failed") } node3 := newNode("node-3", map[string]resources.Quantity{"first": 10}) err = app.Reserve(node3, ask) @@ -308,9 +311,12 @@ func TestAppAllocReservation(t *testing.T) { t.Errorf("reservation should have failed") } askReserved = app.GetAskReservations(aKey) - if len(askReserved) != 2 && (askReserved[0] != nodeKey1 || askReserved[1] != nodeKey1) && - (askReserved[0] != nodeKey2 || askReserved[1] != nodeKey2) { - t.Errorf("app should have reservations for node %s and %s and has not: %v", nodeID1, nodeID2, askReserved) + if len(askReserved) != 1 && askReserved[0] != nodeKey1 { + t.Errorf("app should have reservations for node %s and has not: %v", nodeID1, askReserved) + } + askReserved = app.GetAskReservations(aKey2) + if len(askReserved) != 1 && askReserved[0] != nodeKey2 { + t.Errorf("app should have reservations for node %s and has not: %v", nodeID2, askReserved) } // clean up all asks and reservations reservedAsks := app.RemoveAllocationAsk("") @@ -319,8 +325,7 @@ func TestAppAllocReservation(t *testing.T) { } } -// test update allocation repeat -func TestUpdateRepeat(t *testing.T) { +func TestAllocateDeallocate(t *testing.T) { app := newApplication(appID1, "default", "root.unknown") if app == nil || app.ApplicationID != appID1 { t.Fatalf("app create failed which should not have %v", app) @@ -331,38 +336,39 @@ func TestUpdateRepeat(t *testing.T) { // failure cases var delta *resources.Resource - delta, err = app.UpdateAskRepeat("", 0) - if err == nil || delta != nil { - t.Error("empty ask key should not have been found") + if delta, err = app.AllocateAsk(""); err == nil || delta != nil { + t.Error("empty ask key should not have been found by AllocateAsk()") + } + if delta, err = app.AllocateAsk("unknown"); err == nil || delta != nil { + t.Error("unknown ask key should not have been found by AllocateAsk()") } - delta, err = app.UpdateAskRepeat("unknown", 0) - if err == nil || delta != nil { - t.Error("unknown ask key should not have been found") + if delta, err = app.DeallocateAsk(""); err == nil || delta != nil { + t.Error("empty ask key should not have been found by DeallocateAsk()") + } + if delta, err = app.DeallocateAsk("unknown"); err == nil || delta != nil { + t.Error("unknown ask key should not have been found by DeallocateAsk()") } // working cases res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 1) + ask := newAllocationAsk(aKey, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") - delta, err = app.UpdateAskRepeat(aKey, 0) - if err != nil || !resources.IsZero(delta) { - t.Errorf("0 increase should return zero delta and did not: %v, err %v", delta, err) + // allocate + if delta, err := app.AllocateAsk(aKey); err != nil || !resources.Equals(resources.Multiply(res, -1), delta) { + t.Errorf("AllocateAsk() did not return correct delta, err %v, expected %v got %v", err, resources.Multiply(res, -1), delta) } - delta, err = app.UpdateAskRepeat(aKey, 1) - if err != nil || !resources.Equals(res, delta) { - t.Errorf("increase did not return correct delta, err %v, expected %v got %v", err, res, delta) + // allocate again should fail + if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil { + t.Error("attempt to call Allocate() twice should have failed") } - - // decrease to zero - delta, err = app.UpdateAskRepeat(aKey, -2) - if err != nil || !resources.Equals(resources.Multiply(res, -2), delta) { - t.Errorf("decrease did not return correct delta, err %v, expected %v got %v", err, resources.Multiply(res, -2), delta) + // deallocate + if delta, err := app.DeallocateAsk(aKey); err != nil || !resources.Equals(res, delta) { + t.Errorf("DeallocateAsk() did not return correct delta, err %v, expected %v got %v", err, res, delta) } - // decrease to below zero - delta, err = app.UpdateAskRepeat(aKey, -1) - if err == nil || delta != nil { - t.Errorf("decrease did not return correct delta, err %v, delta %v", err, delta) + // deallocate again should fail + if delta, err := app.DeallocateAsk(aKey); err == nil || delta != nil { + t.Error("attempt to call Deallocate() twice should have failed") } } @@ -394,16 +400,10 @@ func TestAddAllocAsk(t *testing.T) { if err == nil { t.Errorf("zero resource ask should not have been added to app") } - res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask = newAllocationAskRepeat(aKey, appID1, res, 0) - err = app.AddAllocationAsk(ask) - if err == nil { - t.Errorf("ask with zero repeat should not have been added to app") - } // add alloc ask res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask = newAllocationAskRepeat(aKey, appID1, res, 1) + ask = newAllocationAsk(aKey, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") @@ -433,36 +433,21 @@ func TestAddAllocAsk(t *testing.T) { assert.Equal(t, si.EventRecord_APP_REQUEST, record.EventChangeDetail, "incorrect change detail, expected app request") eventSystem.Stop() - ask = newAllocationAskRepeat(aKey, appID1, res, 2) + // change resource + ask = newAllocationAsk(aKey, appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") pending = app.GetPendingResource() - if !resources.Equals(resources.Multiply(res, 2), pending) { + if !resources.Equals(resources.Multiply(res, 2), app.GetPendingResource()) { t.Errorf("pending resource not updated correctly, expected %v but was: %v", resources.Multiply(res, 2), pending) } - // change both resource and count - ask = newAllocationAskRepeat(aKey, appID1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 3}), 5) - err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "ask should have been updated on app") - pending = app.GetPendingResource() - if !resources.Equals(resources.Multiply(res, 3), app.GetPendingResource()) { - t.Errorf("pending resource not updated correctly, expected %v but was: %v", resources.Multiply(res, 3), pending) - } - - // test a decrease of repeat and back to start - ask = newAllocationAskRepeat(aKey, appID1, res, 1) - err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "ask should have been updated on app") - if !resources.Equals(res, app.GetPendingResource()) { - t.Errorf("pending resource not updated correctly, expected %v but was: %v", res, app.GetPendingResource()) - } // after all this is must still be in an accepted state assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") // test PlaceholderData tg1 := "tg-1" - ask = newAllocationAskTG(aKey, appID1, tg1, res, 1) + ask = newAllocationAskTG(aKey, appID1, tg1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") app.SetTimedOutPlaceholder(tg1, 1) @@ -477,7 +462,7 @@ func TestAddAllocAsk(t *testing.T) { assert.Equal(t, app.placeholderData[tg1].TimedOut, int64(1)) assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) - ask = newAllocationAskTG(aKey, appID1, tg1, res, 1) + ask = newAllocationAskTG(aKey, appID1, tg1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") assert.Equal(t, len(app.placeholderData), 1) @@ -488,7 +473,7 @@ func TestAddAllocAsk(t *testing.T) { assert.DeepEqual(t, app.placeholderData[tg1].MinResource, res) tg2 := "tg-2" - ask = newAllocationAskTG(aKey, appID1, tg2, res, 1) + ask = newAllocationAskTG(aKey, appID1, tg2, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been updated on app") assert.Equal(t, len(app.placeholderData), 2) @@ -511,7 +496,7 @@ func TestAllocAskStateChange(t *testing.T) { app.queue = queue res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 1) + ask := newAllocationAsk(aKey, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") @@ -553,20 +538,20 @@ func TestRecoverAllocAsk(t *testing.T) { assert.Equal(t, len(app.requests), 0, "nil ask should not be added") res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 1) + ask := newAllocationAsk(aKey, appID1, res) app.RecoverAllocationAsk(ask) assert.Equal(t, len(app.requests), 1, "ask should have been added") assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") assertUserGroupResource(t, getTestUserGroup(), nil) - ask = newAllocationAskRepeat("ask-2", appID1, res, 1) + ask = newAllocationAsk("ask-2", appID1, res) app.RecoverAllocationAsk(ask) assert.Equal(t, len(app.requests), 2, "ask should have been added, total should be 2") assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") assertUserGroupResource(t, getTestUserGroup(), nil) assert.Equal(t, 0, len(app.placeholderData)) - ask = newAllocationAskTG("ask-3", appID1, "testGroup", res, 1) + ask = newAllocationAskTG("ask-3", appID1, "testGroup", res) app.RecoverAllocationAsk(ask) phData := app.placeholderData assert.Equal(t, 1, len(phData)) @@ -677,16 +662,16 @@ func TestRemoveAllocAsk(t *testing.T) { // setup the allocs res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 2) + ask := newAllocationAsk(aKey, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask 1 should have been added to app") - ask = newAllocationAskRepeat("alloc-2", appID1, res, 2) + ask = newAllocationAsk(aKey2, appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask 2 should have been added to app") if len(app.requests) != 2 { t.Fatalf("missing asks from app expected 2 got %d", len(app.requests)) } - expected := resources.Multiply(res, 4) + expected := resources.Multiply(res, 2) if !resources.Equals(expected, app.GetPendingResource()) { t.Errorf("pending resource not updated correctly, expected %v but was: %v", expected, app.GetPendingResource()) } @@ -699,8 +684,7 @@ func TestRemoveAllocAsk(t *testing.T) { delta := app.GetPendingResource().Clone() reservedAsks = app.RemoveAllocationAsk(aKey) delta.SubFrom(app.GetPendingResource()) - expected = resources.Multiply(res, 2) - if !resources.Equals(delta, expected) || reservedAsks != 0 { + if !resources.Equals(delta, res) || reservedAsks != 0 { t.Errorf("ask should have been removed from app, err %v, expected delta %v but was: %v, (reserved released = %d)", err, expected, delta, reservedAsks) } reservedAsks = app.RemoveAllocationAsk("") @@ -723,12 +707,12 @@ func TestRemoveAllocAskWithPlaceholders(t *testing.T) { app.queue = queue res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 2) + ask := newAllocationAsk(aKey, appID1, res) ask.placeholder = true err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask 1 should have been added to app") - ask = newAllocationAskRepeat("alloc-2", appID1, res, 2) + ask = newAllocationAsk("alloc-2", appID1, res) ask.placeholder = true err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask 2 should have been added to app") @@ -749,14 +733,14 @@ func TestRemovePlaceholderAllocationWithNoRealAllocation(t *testing.T) { t.Fatalf("app create failed which should not have %v", app) } res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) - ask := newAllocationAskRepeat(aKey, appID1, res, 2) + ask := newAllocationAsk(aKey, appID1, res) ask.placeholder = true allocInfo := NewAllocation(nodeID1, ask) app.AddAllocation(allocInfo) err := app.handleApplicationEventWithLocking(RunApplication) assert.NilError(t, err, "no error expected new to accepted") - app.RemoveAllocation("alloc-1-0", si.TerminationType_UNKNOWN_TERMINATION_TYPE) + app.RemoveAllocation("alloc-1", si.TerminationType_UNKNOWN_TERMINATION_TYPE) assert.Equal(t, app.stateMachine.Current(), Completing.String()) assertUserGroupResource(t, getTestUserGroup(), nil) } @@ -813,7 +797,7 @@ func TestStateChangeOnUpdate(t *testing.T) { assert.Assert(t, app.IsRunning(), "Application should have stayed same, changed unexpectedly: %s", app.CurrentState()) // remove the allocation, ask has been removed so nothing left - app.RemoveAllocation(askID+"-0", si.TerminationType_UNKNOWN_TERMINATION_TYPE) + app.RemoveAllocation(askID, si.TerminationType_UNKNOWN_TERMINATION_TYPE) assert.Assert(t, app.IsCompleting(), "Application did not change as expected: %s", app.CurrentState()) assertUserGroupResource(t, getTestUserGroup(), nil) @@ -839,7 +823,7 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) { assert.Assert(t, app.IsNew(), "New application did not return new state: %s", app.CurrentState()) res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) askID := "ask-1" - ask := newAllocationAskTG(askID, appID1, "TG1", res, 1) + ask := newAllocationAskTG(askID, appID1, "TG1", res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") // app with ask, even for placeholder, should be accepted @@ -873,7 +857,7 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) { assertUserGroupResource(t, getTestUserGroup(), res) // first we have to remove the allocation itself - alloc := app.RemoveAllocation(askID+"-0", si.TerminationType_UNKNOWN_TERMINATION_TYPE) + alloc := app.RemoveAllocation(askID, si.TerminationType_UNKNOWN_TERMINATION_TYPE) assert.Assert(t, alloc != nil, "Nil allocation was returned") assert.Assert(t, app.IsAccepted(), "Application should have stayed in Accepted, changed unexpectedly: %s", app.CurrentState()) // removing the ask should move the application into the waiting state, because the allocation is only a placeholder allocation @@ -1416,7 +1400,7 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin assert.NilError(t, err, "Unexpected error when creating resource from map") // add the placeholder ask to the app tg1 := "tg-1" - phAsk := newAllocationAskTG("ask-1", appID1, tg1, res, 1) + phAsk := newAllocationAskTG("ask-1", appID1, tg1, res) err = app.AddAllocationAsk(phAsk) assert.NilError(t, err, "Application ask should have been added") assert.Assert(t, app.IsAccepted(), "Application should be in accepted state") @@ -1749,9 +1733,9 @@ func TestCanReplace(t *testing.T) { want bool }{ {"nil", nil, false}, - {"placeholder", newAllocationAskTG(aKey, appID1, tg1, res, 1), false}, + {"placeholder", newAllocationAskTG(aKey, appID1, tg1, res), false}, {"no TG", newAllocationAsk(aKey, appID1, res), false}, - {"no placeholder data", newAllocationAskAll(aKey, appID1, tg1, res, 1, false, 0), false}, + {"no placeholder data", newAllocationAskAll(aKey, appID1, tg1, res, false, 0), false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1760,14 +1744,14 @@ func TestCanReplace(t *testing.T) { } // add the placeholder data // available tg has one replacement open - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1, res, 1)) + app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg1, res)) // unavailable tg has NO replacement open (replaced) tg2 := "unavailable" - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2, res, 1)) + app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg2, res)) app.placeholderData[tg2].Replaced++ // unavailable tg has NO replacement open (timedout) tg3 := "timedout" - app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3, res, 1)) + app.addPlaceholderDataWithLocking(newAllocationAskTG(aKey, appID1, tg3, res)) app.placeholderData[tg3].TimedOut++ tests = []struct { name string @@ -1775,10 +1759,10 @@ func TestCanReplace(t *testing.T) { want bool }{ {"no TG", newAllocationAsk(aKey, appID1, res), false}, - {"TG mismatch", newAllocationAskAll(aKey, appID1, "unknown", res, 1, false, 0), false}, - {"TG placeholder used", newAllocationAskAll(aKey, appID1, tg2, res, 1, false, 0), false}, - {"TG placeholder timed out", newAllocationAskAll(aKey, appID1, tg3, res, 1, false, 0), false}, - {"TG placeholder available", newAllocationAskAll(aKey, appID1, tg1, res, 1, false, 0), true}, + {"TG mismatch", newAllocationAskAll(aKey, appID1, "unknown", res, false, 0), false}, + {"TG placeholder used", newAllocationAskAll(aKey, appID1, tg2, res, false, 0), false}, + {"TG placeholder timed out", newAllocationAskAll(aKey, appID1, tg3, res, false, 0), false}, + {"TG placeholder available", newAllocationAskAll(aKey, appID1, tg1, res, false, 0), true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -2009,25 +1993,25 @@ func TestMaxAskPriority(t *testing.T) { assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after re-adding asks") - // update repeat to zero for p=15 - _, err = app.UpdateAskRepeat(ask3.GetAllocationKey(), -1) + // update to allocated for p=15 + _, err = app.AllocateAsk(ask3.GetAllocationKey()) assert.NilError(t, err, "ask should have been updated") - assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=15 repeat to 0") + assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=15 to allocated") - // update repeat to zero for p=5 - _, err = app.UpdateAskRepeat(ask2.GetAllocationKey(), -1) + // update to allocated for p=5 + _, err = app.AllocateAsk(ask2.GetAllocationKey()) assert.NilError(t, err, "ask should have been updated") - assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 repeat to 0") + assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 to allocated") - // update repeat to 1 for p=5 - _, err = app.UpdateAskRepeat(ask2.GetAllocationKey(), 1) + // update to unallocated for p=5 + _, err = app.DeallocateAsk(ask2.GetAllocationKey()) assert.NilError(t, err, "ask should have been updated") - assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 repeat to 1") + assert.Equal(t, app.GetAskMaxPriority(), int32(10), "wrong priority after updating p=5 to unallocated") - // update repeat to 1 for p=15 - _, err = app.UpdateAskRepeat(ask3.GetAllocationKey(), 1) + // update to unallocated for p=15 + _, err = app.DeallocateAsk(ask3.GetAllocationKey()) assert.NilError(t, err, "ask should have been updated") - assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after updating p=15 repeat to 1") + assert.Equal(t, app.GetAskMaxPriority(), int32(15), "wrong priority after updating p=15 to unallocated") } func TestAskEvents(t *testing.T) { diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 0cd30393..6ced5642 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -544,10 +544,10 @@ func TestSortApplications(t *testing.T) { if len(sortedApp) != 1 || sortedApp[0].ApplicationID != appID1 { t.Errorf("sorted application is missing expected app: %v", sortedApp) } - // set 0 repeat - _, err = app.UpdateAskRepeat("alloc-1", -1) + // set allocated + _, err = app.AllocateAsk("alloc-1") if err != nil || len(leaf.sortApplications(false)) != 0 { - t.Errorf("app with ask but 0 pending resources should not be in sorted apps: %v (err = %v)", app, err) + t.Errorf("app with ask but no pending resources should not be in sorted apps: %v (err = %v)", app, err) } } @@ -1666,12 +1666,9 @@ func TestFindEligiblePreemptionVictims(t *testing.T) { parentMax := map[string]string{siCommon.Memory: "200"} parentGuar := map[string]string{siCommon.Memory: "100"} ask := createAllocationAsk("ask1", appID1, true, true, 0, res) - ask.pendingAskRepeat = 1 ask2 := createAllocationAsk("ask2", appID2, true, true, -1000, res) - ask2.pendingAskRepeat = 1 alloc2 := NewAllocation(nodeID1, ask2) ask3 := createAllocationAsk("ask3", appID2, true, true, -1000, res) - ask3.pendingAskRepeat = 1 alloc3 := NewAllocation(nodeID1, ask3) root, err := createRootQueue(map[string]string{siCommon.Memory: "1000"}) assert.NilError(t, err, "failed to create queue") @@ -2522,7 +2519,7 @@ func TestQueueRunningAppsForSingleAllocationApp(t *testing.T) { assert.Equal(t, app.CurrentState(), Running.String(), "app state should be running") assert.Equal(t, leaf.runningApps, uint64(1), "leaf should have 1 app running") - _, err = app.updateAskRepeatInternal(ask, -1) + _, err = app.allocateAsk(ask) assert.NilError(t, err, "failed to decrease pending resources") app.RemoveAllocation(alloc.GetAllocationID(), si.TerminationType_STOPPED_BY_RM) diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go b/pkg/scheduler/objects/required_node_preemptor_test.go index 9b852831..32bcdb2d 100644 --- a/pkg/scheduler/objects/required_node_preemptor_test.go +++ b/pkg/scheduler/objects/required_node_preemptor_test.go @@ -124,7 +124,6 @@ func TestSortAllocations(t *testing.T) { requiredAsk := createAllocationAsk("ask", "app1", true, true, 20, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})) - requiredAsk.pendingAskRepeat = 5 p := NewRequiredNodePreemptor(node, requiredAsk) prepareAllocationAsks(node) diff --git a/pkg/scheduler/objects/utilities_test.go b/pkg/scheduler/objects/utilities_test.go index 577e26ff..3c325046 100644 --- a/pkg/scheduler/objects/utilities_test.go +++ b/pkg/scheduler/objects/utilities_test.go @@ -40,6 +40,7 @@ const ( appID1 = "app-1" appID2 = "app-2" aKey = "alloc-1" + aKey2 = "alloc-2" aAllocationID = "alloc-allocationid-1" nodeID1 = "node-1" nodeID2 = "node-2" @@ -225,31 +226,26 @@ func newPlaceholderAlloc(appID, nodeID string, res *resources.Resource) *Allocat } func newAllocationAsk(allocKey, appID string, res *resources.Resource) *AllocationAsk { - return newAllocationAskAll(allocKey, appID, "", res, 1, false, 0) + return newAllocationAskAll(allocKey, appID, "", res, false, 0) } func newAllocationAskPriority(allocKey, appID string, res *resources.Resource, priority int32) *AllocationAsk { - return newAllocationAskAll(allocKey, appID, "", res, 1, false, priority) + return newAllocationAskAll(allocKey, appID, "", res, false, priority) } -func newAllocationAskRepeat(allocKey, appID string, res *resources.Resource, repeat int) *AllocationAsk { - return newAllocationAskAll(allocKey, appID, "", res, repeat, false, 0) +func newAllocationAskTG(allocKey, appID, taskGroup string, res *resources.Resource) *AllocationAsk { + return newAllocationAskAll(allocKey, appID, taskGroup, res, taskGroup != "", 0) } -func newAllocationAskTG(allocKey, appID, taskGroup string, res *resources.Resource, repeat int) *AllocationAsk { - return newAllocationAskAll(allocKey, appID, taskGroup, res, repeat, taskGroup != "", 0) -} - -func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, repeat int, placeholder bool, priority int32) *AllocationAsk { +func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, placeholder bool, priority int32) *AllocationAsk { ask := &si.AllocationAsk{ - AllocationKey: allocKey, - ApplicationID: appID, - PartitionName: "default", - ResourceAsk: res.ToProto(), - MaxAllocations: int32(repeat), - TaskGroupName: taskGroup, - Placeholder: placeholder, - Priority: priority, + AllocationKey: allocKey, + ApplicationID: appID, + PartitionName: "default", + ResourceAsk: res.ToProto(), + TaskGroupName: taskGroup, + Placeholder: placeholder, + Priority: priority, } return NewAllocationAskFromSI(ask) } diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index a72dc75c..3ac6a872 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -756,8 +756,8 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object // unlink the placeholder and allocation release.ClearReleases() alloc.ClearReleases() - // update the repeat on the real alloc to get it re-scheduled - _, err := app.UpdateAskRepeat(askAlloc.GetAsk().GetAllocationKey(), 1) + // mark ask as unallocated to get it re-scheduled + _, err := app.DeallocateAsk(askAlloc.GetAsk().GetAllocationKey()) if err == nil { log.Log(log.SchedPartition).Info("inflight placeholder replacement reversed due to node removal", zap.String("appID", askAlloc.GetApplicationID()), diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index e8f0e001..00a980a4 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -21,7 +21,6 @@ package scheduler import ( "fmt" "strconv" - "strings" "testing" "time" @@ -257,7 +256,7 @@ func TestAddNodeWithAllocations(t *testing.T) { // fail with a broken alloc ask = newAllocationAsk("alloc-1-allocationid", appID1, appRes) alloc = objects.NewAllocation(nodeID1, ask) - assert.Equal(t, alloc.GetAllocationID(), "alloc-1-allocationid-0") + assert.Equal(t, alloc.GetAllocationID(), "alloc-1-allocationid") // reset allocationID to empty alloc.SetAllocationID("") assert.Equal(t, alloc.GetAllocationID(), "") @@ -376,10 +375,10 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) { assert.Equal(t, 1, partition.getPhAllocationCount(), "number of active placeholders") // fake an ask that is used - ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1, false) + ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should be added to app") - _, err = app.UpdateAskRepeat(allocID, -1) + _, err = app.AllocateAsk(allocID) assert.NilError(t, err, "ask should have been updated without error") assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") assertLimits(t, getTestUserGroup(), appRes) @@ -470,7 +469,7 @@ func TestPlaceholderDataWithPlaceholderPreemption(t *testing.T) { newRes.MultiplyTo(4) phRes.MultiplyTo(7) - ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1, false) + ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) alloc := objects.NewAllocation(nodeID1, ask) allocs := []*objects.Allocation{alloc} @@ -596,7 +595,7 @@ func TestPlaceholderDataWithNodeRemoval(t *testing.T) { phRes.MultiplyTo(7) // add a node with allocation: must have the correct app1 added already - ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1, false) + ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) alloc := objects.NewAllocation(nodeID1, ask) allocs := []*objects.Allocation{alloc} @@ -679,7 +678,7 @@ func TestPlaceholderDataWithRemoval(t *testing.T) { phRes.MultiplyTo(7) // add a node with allocation: must have the correct app1 added already - ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 0, 1, false) + ask := newAllocationAskAll("ask-1", appID1, taskGroup, appRes, 1, false) alloc := objects.NewAllocation(nodeID1, ask) allocs := []*objects.Allocation{alloc} @@ -764,7 +763,7 @@ func TestRemoveNodeWithReplacement(t *testing.T) { nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) node1 := newNodeMaxResource(nodeID1, nodeRes) appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) - ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 0, 1, true) + ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1, true) ph := objects.NewAllocation(nodeID1, ask) allocs := []*objects.Allocation{ph} err = partition.AddNode(node1, allocs) @@ -779,10 +778,10 @@ func TestRemoveNodeWithReplacement(t *testing.T) { assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected") // fake an ask that is used - ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1, false) + ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should be added to app") - _, err = app.UpdateAskRepeat(allocID, -1) + _, err = app.AllocateAsk(allocID) assert.NilError(t, err, "ask should have been updated without error") assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") @@ -836,7 +835,7 @@ func TestRemoveNodeWithReal(t *testing.T) { nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) node1 := newNodeMaxResource(nodeID1, nodeRes) appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) - ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 0, 1, true) + ask := newAllocationAskAll("placeholder", appID1, taskGroup, appRes, 1, true) ph := objects.NewAllocation(nodeID1, ask) allocs := []*objects.Allocation{ph} err = partition.AddNode(node1, allocs) @@ -851,10 +850,10 @@ func TestRemoveNodeWithReal(t *testing.T) { assert.Equal(t, 2, partition.GetTotalNodeCount(), "node list was not updated as expected") // fake an ask that is used - ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, 1, false) + ask = newAllocationAskAll(allocID, appID1, taskGroup, appRes, 1, false) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should be added to app") - _, err = app.UpdateAskRepeat(allocID, -1) + _, err = app.AllocateAsk(allocID) assert.NilError(t, err, "ask should have been updated without error") assert.Assert(t, resources.IsZero(app.GetPendingResource()), "app should not have pending resources") @@ -1144,7 +1143,7 @@ func TestRemoveAppAllocs(t *testing.T) { assertLimits(t, getTestUserGroup(), appRes) ask = newAllocationAsk("alloc-1", appNotRemoved, appRes) - allocationID := "alloc-1-0" + allocationID := "alloc-1" alloc = objects.NewAllocation(nodeID1, ask) err = partition.addAllocation(alloc) assert.NilError(t, err, "add allocation to partition should not have failed") @@ -1552,14 +1551,14 @@ func TestTryAllocate(t *testing.T) { assert.NilError(t, err, "failed to add app-1 to partition") err = app.AddAllocationAsk(newAllocationAsk(allocID, appID1, res)) assert.NilError(t, err, "failed to add ask alloc-1 to app-1") - err = app.AddAllocationAsk(newAllocationAskPriority("alloc-2", appID1, res, 1, 2)) + err = app.AddAllocationAsk(newAllocationAskPriority("alloc-2", appID1, res, 2)) assert.NilError(t, err, "failed to add ask alloc-2 to app-1") app = newApplication(appID2, "default", "root.leaf") // add to the partition err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-2 to partition") - err = app.AddAllocationAsk(newAllocationAskPriority(allocID, appID2, res, 1, 2)) + err = app.AddAllocationAsk(newAllocationAskPriority(allocID, appID2, res, 2)) assert.NilError(t, err, "failed to add ask alloc-1 to app-2") expectedQueuesMaxLimits := make(map[string]map[string]interface{}) @@ -1699,7 +1698,10 @@ func TestRequiredNodeCancelNonDSReservations(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAskRepeat("alloc-1", appID1, res, 2) + ask := newAllocationAsk("alloc-1", appID1, res) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "failed to add ask to app") + ask = newAllocationAsk("alloc-2", appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask to app") // calculate the resource size using the repeat request (reuse is possible using proto conversions in ask) @@ -1774,10 +1776,14 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAskRepeat("alloc-1", appID1, res, 2) + ask := newAllocationAsk("alloc-1", appID1, res) ask.SetRequiredNode(nodeID1) err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "failed to add ask to app") + assert.NilError(t, err, "failed to add ask 1 to app") + ask = newAllocationAsk("alloc-2", appID1, res) + ask.SetRequiredNode(nodeID1) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "failed to add ask 2 to app") // calculate the resource size using the repeat request (reuse is possible using proto conversions in ask) res.MultiplyTo(2) assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") @@ -2309,10 +2315,13 @@ func TestAllocReserveNewNode(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAskRepeat("alloc-1", appID1, res, 2) + ask := newAllocationAsk("alloc-1", appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask to app") - // calculate the resource size using the repeat request (reuse is possible using proto conversions in ask) + ask2 := newAllocationAsk("alloc-2", appID1, res) + err = app.AddAllocationAsk(ask2) + assert.NilError(t, err, "failed to add ask2 to app") + // calculate the resource size for two asks res.MultiplyTo(2) assert.Assert(t, resources.Equals(res, app.GetPendingResource()), "pending resource not set as expected") assert.Assert(t, resources.Equals(res, partition.root.GetPendingResource()), "pending resource not set as expected on root queue") @@ -2438,10 +2447,14 @@ func TestTryAllocateWithReserved(t *testing.T) { err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app-1 to partition") - ask := newAllocationAskRepeat("alloc-1", appID1, res, 2) + ask := newAllocationAsk("alloc-1", appID1, res) err = app.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask alloc-1 to app") + ask2 := newAllocationAsk("alloc-2", appID1, res) + err = app.AddAllocationAsk(ask2) + assert.NilError(t, err, "failed to add ask alloc-2 to app") + // reserve one node: scheduling should happen on the other node2 := partition.GetNode(nodeID2) if node2 == nil { @@ -2491,9 +2504,11 @@ func TestScheduleRemoveReservedAsk(t *testing.T) { app := newApplication(appID1, "default", "root.parent.sub-leaf") err = partition.AddApplication(app) assert.NilError(t, err, "failed to add app app-1 to partition") - ask := newAllocationAskRepeat("alloc-1", appID1, res, 4) - err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "failed to add ask alloc-1 to app") + for i := 1; i <= 4; i++ { + ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID1, res) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, fmt.Sprintf("failed to add ask alloc-%d to app", i)) + } // calculate the resource size using the repeat request pending := resources.Multiply(res, 4) @@ -2509,12 +2524,12 @@ func TestScheduleRemoveReservedAsk(t *testing.T) { assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000})) // add a asks which should reserve - ask = newAllocationAskRepeat("alloc-2", appID1, res, 1) + ask := newAllocationAsk("alloc-5", appID1, res) err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "failed to add ask alloc-2 to app") - ask = newAllocationAskRepeat("alloc-3", appID1, res, 1) + assert.NilError(t, err, "failed to add ask alloc-5 to app") + ask = newAllocationAsk("alloc-6", appID1, res) err = app.AddAllocationAsk(ask) - assert.NilError(t, err, "failed to add ask alloc-3 to app") + assert.NilError(t, err, "failed to add ask alloc-6 to app") pending = resources.Multiply(res, 2) assert.Assert(t, resources.Equals(pending, app.GetPendingResource()), "pending resource not set as expected") // allocate so we get reservations @@ -2543,9 +2558,9 @@ func TestScheduleRemoveReservedAsk(t *testing.T) { // before confirming remove the ask: do what the scheduler does when it gets a request from a // shim in processAllocationReleaseByAllocationKey() // make sure we are counting correctly and leave the other reservation intact - removeAskID := "alloc-2" - if alloc.GetAllocationKey() == "alloc-3" { - removeAskID = "alloc-3" + removeAskID := "alloc-5" + if alloc.GetAllocationKey() == "alloc-6" { + removeAskID = "alloc-6" } released := app.RemoveAllocationAsk(removeAskID) assert.Equal(t, released, 1, "expected one reservations to be released") @@ -3521,18 +3536,10 @@ func TestAddAllocationAsk(t *testing.T) { assert.NilError(t, err, "failed to create resource") askKey := "ask-key-1" ask := si.AllocationAsk{ - AllocationKey: askKey, - ApplicationID: appID1, - ResourceAsk: res.ToProto(), - MaxAllocations: 0, - } - err = partition.addAllocationAsk(&ask) - if err == nil || !strings.Contains(err.Error(), "invalid") { - t.Fatalf("0 repeat ask should have returned invalid ask error: %v", err) + AllocationKey: askKey, + ApplicationID: appID1, + ResourceAsk: res.ToProto(), } - - // set the repeat and retry this should work - ask.MaxAllocations = 1 err = partition.addAllocationAsk(&ask) assert.NilError(t, err, "failed to add ask to app") if !resources.Equals(app.GetPendingResource(), res) { @@ -3915,7 +3922,7 @@ func TestUserHeadroom(t *testing.T) { res, err = resources.NewResourceFromConf(map[string]string{"memory": "3", "vcores": "3"}) assert.NilError(t, err, "failed to create resource") - ask := newAllocationAskRepeat("alloc-1", "app-5", res, 2) + ask := newAllocationAsk("alloc-1", "app-5", res) err = app5.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask to app") @@ -3936,7 +3943,7 @@ func TestUserHeadroom(t *testing.T) { assert.Equal(t, objects.AllocatedReserved, alloc.GetResult(), "allocation result should have been allocated") // create a reservation and ensure reservation has not been allocated because there is no headroom for the user - ask = newAllocationAskRepeat("alloc-2", "app-5", res, 2) + ask = newAllocationAsk("alloc-2", "app-5", res) err = app5.AddAllocationAsk(ask) assert.NilError(t, err, "failed to add ask to app") partition.reserve(app5, node2, ask) @@ -4418,22 +4425,19 @@ func TestCalculateOutstandingRequests(t *testing.T) { "memory": 1, }) siAsk1 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-1", - ApplicationID: appID1, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), } siAsk2 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-2", - ApplicationID: appID1, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), } siAsk3 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-3", - ApplicationID: appID2, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), } err = partition.addAllocationAsk(siAsk1) assert.NilError(t, err) @@ -4511,18 +4515,18 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { assert.Equal(t, objects.Replaced, alloc.GetResult()) assert.Equal(t, "real-alloc", alloc.GetAllocationKey()) assert.Equal(t, "tg-1", alloc.GetTaskGroup()) - assert.Equal(t, "real-alloc-0", alloc.GetAllocationID()) + assert.Equal(t, "real-alloc", alloc.GetAllocationID()) // remove the terminated placeholder allocation released, confirmed := partition.removeAllocation(&si.AllocationRelease{ ApplicationID: appID1, TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, - AllocationKey: "real-alloc-0", - AllocationID: "placeholder-0", + AllocationKey: "real-alloc", + AllocationID: "placeholder", }) assert.Assert(t, released == nil, "unexpected released allocation") assert.Assert(t, confirmed != nil, "expected to have a confirmed allocation") assert.Equal(t, "real-alloc", confirmed.GetAllocationKey()) assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) - assert.Equal(t, "real-alloc-0", confirmed.GetAllocationID()) + assert.Equal(t, "real-alloc", confirmed.GetAllocationID()) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 5dc7cba5..dac29a04 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -46,22 +46,19 @@ func TestInspectOutstandingRequests(t *testing.T) { "memory": 1, }) siAsk1 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-1", - ApplicationID: appID1, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-1", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), } siAsk2 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-2", - ApplicationID: appID1, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-2", + ApplicationID: appID1, + ResourceAsk: askResource.ToProto(), } siAsk3 := &si.AllocationAsk{ - AllocationKey: "ask-uuid-3", - ApplicationID: appID2, - ResourceAsk: askResource.ToProto(), - MaxAllocations: 1, + AllocationKey: "ask-uuid-3", + ApplicationID: appID2, + ResourceAsk: askResource.ToProto(), } err = partition.addAllocationAsk(siAsk1) assert.NilError(t, err) diff --git a/pkg/scheduler/tests/application_tracking_test.go b/pkg/scheduler/tests/application_tracking_test.go index d14df297..9243b3dc 100644 --- a/pkg/scheduler/tests/application_tracking_test.go +++ b/pkg/scheduler/tests/application_tracking_test.go @@ -119,8 +119,7 @@ func TestApplicationHistoryTracking(t *testing.T) { "vcore": {Value: 1000}, }, }, - MaxAllocations: 1, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", diff --git a/pkg/scheduler/tests/mockscheduler_test.go b/pkg/scheduler/tests/mockscheduler_test.go index 05e93c1f..222aa390 100644 --- a/pkg/scheduler/tests/mockscheduler_test.go +++ b/pkg/scheduler/tests/mockscheduler_test.go @@ -146,16 +146,17 @@ func (m *mockScheduler) removeApp(appID, partition string) error { }) } -func (m *mockScheduler) addAppRequest(appID, allocID string, resource *si.Resource, repeat int32) error { +func (m *mockScheduler) addAppRequest(appID, allocID string, resource *si.Resource, repeat int) error { + asks := make([]*si.AllocationAsk, repeat) + for i := 0; i < repeat; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("%s-%d", allocID, i), + ApplicationID: appID, + ResourceAsk: resource, + } + } return m.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: allocID, - ApplicationID: appID, - ResourceAsk: resource, - MaxAllocations: repeat, - }, - }, + Asks: asks, RmID: m.rmID, }) } diff --git a/pkg/scheduler/tests/operation_test.go b/pkg/scheduler/tests/operation_test.go index 1e7f94c7..49e55a19 100644 --- a/pkg/scheduler/tests/operation_test.go +++ b/pkg/scheduler/tests/operation_test.go @@ -96,8 +96,17 @@ partitions: "vcore": {Value: 1000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, + }, + }, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -224,8 +233,17 @@ partitions: "vcore": {Value: 1000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, + }, + }, + ApplicationID: appID1, }, }, RmID: "rm:123", diff --git a/pkg/scheduler/tests/performance_test.go b/pkg/scheduler/tests/performance_test.go index 4749b5c5..bfbdc852 100644 --- a/pkg/scheduler/tests/performance_test.go +++ b/pkg/scheduler/tests/performance_test.go @@ -126,40 +126,43 @@ partitions: // Request pods app1NumPods := numPods / 2 - err = proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: int64(requestMem)}, - "vcore": {Value: int64(requestVcore)}, - }, + app1Asks := make([]*si.AllocationAsk, app1NumPods) + for i := 0; i < app1NumPods; i++ { + app1Asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-1-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: int64(requestMem)}, + "vcore": {Value: int64(requestVcore)}, }, - MaxAllocations: int32(app1NumPods), - ApplicationID: appID1, }, - }, + ApplicationID: appID1, + } + } + err = proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: app1Asks, RmID: "rm:123", }) if err != nil { b.Error(err.Error()) } - err = proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: int64(requestMem)}, - "vcore": {Value: int64(requestVcore)}, - }, + app2NumPods := numPods - app1NumPods + app2Asks := make([]*si.AllocationAsk, app2NumPods) + for i := 0; i < app2NumPods; i++ { + app2Asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-2-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: int64(requestMem)}, + "vcore": {Value: int64(requestVcore)}, }, - MaxAllocations: int32(numPods - app1NumPods), - ApplicationID: appID2, }, - }, + ApplicationID: appID2, + } + } + err = proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: app2Asks, RmID: "rm:123", }) if err != nil { @@ -167,6 +170,8 @@ partitions: } // Reset timer for this benchmark + duration = time.Since(startTime) + b.Logf("Total time to add %d pods in %s, %f per second", numPods, duration, float64(numPods)/duration.Seconds()) startTime = time.Now() b.ResetTimer() diff --git a/pkg/scheduler/tests/plugin_test.go b/pkg/scheduler/tests/plugin_test.go index c45421eb..467ffebc 100644 --- a/pkg/scheduler/tests/plugin_test.go +++ b/pkg/scheduler/tests/plugin_test.go @@ -97,8 +97,7 @@ partitions: "memory": {Value: 8}, }, }, - MaxAllocations: 1, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -122,8 +121,7 @@ partitions: "memory": {Value: 5}, }, }, - MaxAllocations: 1, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", diff --git a/pkg/scheduler/tests/recovery_test.go b/pkg/scheduler/tests/recovery_test.go index 09e0f26e..75c1f470 100644 --- a/pkg/scheduler/tests/recovery_test.go +++ b/pkg/scheduler/tests/recovery_test.go @@ -19,6 +19,7 @@ package tests import ( + "fmt" "testing" "gotest.tools/v3/assert" @@ -125,6 +126,16 @@ func TestSchedulerRecovery(t *testing.T) { err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ Asks: []*si.AllocationAsk{ + { + AllocationKey: "alloc-0", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + ApplicationID: appID1, + }, { AllocationKey: "alloc-1", ResourceAsk: &si.Resource{ @@ -133,8 +144,7 @@ func TestSchedulerRecovery(t *testing.T) { "vcore": {Value: 1}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -170,32 +180,23 @@ func TestSchedulerRecovery(t *testing.T) { waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", []string{"node-1:1234", "node-2:1234"}, 20, 1000) - // ask for two more resources - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-2", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 50}, - "vcore": {Value: 5}, - }, - }, - MaxAllocations: 2, - ApplicationID: appID1, - }, - { - AllocationKey: "alloc-3", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 100}, - "vcore": {Value: 5}, - }, + // ask for 4 more allocations + asks := make([]*si.AllocationAsk, 4) + mem := [4]int64{50, 100, 50, 100} + for i := 0; i < 4; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i+2), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: mem[i]}, + "vcore": {Value: 5}, }, - MaxAllocations: 2, - ApplicationID: appID1, }, - }, + ApplicationID: appID1, + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -391,8 +392,17 @@ func TestSchedulerRecovery2Allocations(t *testing.T) { "vcore": {Value: 1}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -800,8 +810,17 @@ partitions: "vcore": {Value: 1}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -1006,7 +1025,6 @@ func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen "vcore": {Value: 1}, }, }, - MaxAllocations: 1, ApplicationID: appID1, TaskGroupName: "tg-2", Placeholder: true, @@ -1028,7 +1046,6 @@ func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen "vcore": {Value: 1}, }, }, - MaxAllocations: 1, ApplicationID: appID1, TaskGroupName: "tg-1", }, @@ -1040,7 +1057,6 @@ func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen "vcore": {Value: 1}, }, }, - MaxAllocations: 1, ApplicationID: appID1, TaskGroupName: "tg-2", }, @@ -1057,13 +1073,13 @@ func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen { ApplicationID: appID1, PartitionName: "default", - AllocationID: "ph-alloc-1-0", + AllocationID: "ph-alloc-1", TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, }, { ApplicationID: appID1, PartitionName: "default", - AllocationID: "ph-alloc-2-0", + AllocationID: "ph-alloc-2", TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, }, }, @@ -1079,13 +1095,13 @@ func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen { ApplicationID: appID1, PartitionName: "default", - AllocationID: "real-alloc-1-0", + AllocationID: "real-alloc-1", TerminationType: si.TerminationType_STOPPED_BY_RM, }, { ApplicationID: appID1, PartitionName: "default", - AllocationID: "real-alloc-2-0", + AllocationID: "real-alloc-2", TerminationType: si.TerminationType_STOPPED_BY_RM, }, }, diff --git a/pkg/scheduler/tests/smoke_test.go b/pkg/scheduler/tests/smoke_test.go index 72f61fee..e25269dd 100644 --- a/pkg/scheduler/tests/smoke_test.go +++ b/pkg/scheduler/tests/smoke_test.go @@ -19,6 +19,7 @@ package tests import ( + "fmt" "strconv" "testing" "time" @@ -232,15 +233,24 @@ func TestBasicScheduler(t *testing.T) { err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ Asks: []*si.AllocationAsk{ { - AllocationKey: "alloc-1", + AllocationKey: "alloc-1a", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, + }, + }, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-1b", ResourceAsk: &si.Resource{ Resources: map[string]*si.Quantity{ "memory": {Value: 10000000}, "vcore": {Value: 1000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -274,30 +284,48 @@ func TestBasicScheduler(t *testing.T) { // Check allocated resources of nodes waitForAllocatedNodeResource(t, ms.scheduler.GetClusterContext(), ms.partitionName, []string{"node-1:1234", "node-2:1234"}, 20000000, 1000) - // ask for two more resources + // ask for 4 more tasks err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ Asks: []*si.AllocationAsk{ { - AllocationKey: "alloc-2", + AllocationKey: "alloc-2a", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 50000000}, + "vcore": {Value: 5000}, + }, + }, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2b", ResourceAsk: &si.Resource{ Resources: map[string]*si.Quantity{ "memory": {Value: 50000000}, "vcore": {Value: 5000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-3a", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 100000000}, + "vcore": {Value: 5000}, + }, + }, + ApplicationID: appID1, }, { - AllocationKey: "alloc-3", + AllocationKey: "alloc-3b", ResourceAsk: &si.Resource{ Resources: map[string]*si.Quantity{ "memory": {Value: 100000000}, "vcore": {Value: 5000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -432,20 +460,21 @@ func TestBasicSchedulerAutoAllocation(t *testing.T) { ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000) - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, + asks := make([]*si.AllocationAsk, 20) + for i := 0; i < 20; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, }, - MaxAllocations: 20, - ApplicationID: appID, }, - }, + ApplicationID: appID, + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -542,31 +571,22 @@ partitions: ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000) - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, - }, - MaxAllocations: 20, - ApplicationID: app1ID, - }, - { - AllocationKey: "alloc-2", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, + asks := make([]*si.AllocationAsk, 40) + appIDs := []string{app1ID, app2ID} + for i := 0; i < 40; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, }, - MaxAllocations: 20, - ApplicationID: app2ID, }, - }, + ApplicationID: appIDs[(i / 20)], + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -664,31 +684,22 @@ partitions: ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000) - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, - }, - MaxAllocations: 20, - ApplicationID: app1ID, - }, - { - AllocationKey: "alloc-2", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, + asks := make([]*si.AllocationAsk, 40) + appIDs := []string{app1ID, app2ID} + for i := 0; i < 40; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, }, - MaxAllocations: 20, - ApplicationID: app2ID, }, - }, + ApplicationID: appIDs[i/20], + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -862,20 +873,21 @@ partitions: ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: param.askMemory}, - "vcore": {Value: param.askCPU}, - }, + asks := make([]*si.AllocationAsk, param.numOfAsk) + for i := int32(0); i < param.numOfAsk; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: param.askMemory}, + "vcore": {Value: param.askCPU}, }, - MaxAllocations: param.numOfAsk, - ApplicationID: appID1, }, - }, + ApplicationID: appID1, + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -1112,31 +1124,22 @@ partitions: ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) ms.mockRM.waitForAcceptedNode(t, "node-2:1234", 1000) - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, - }, - MaxAllocations: 20, - ApplicationID: app1ID, - }, - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, + asks := make([]*si.AllocationAsk, 40) + appIDs := []string{app1ID, app2ID} + for i := 0; i < 40; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, }, - MaxAllocations: 20, - ApplicationID: app2ID, }, - }, + ApplicationID: appIDs[i/20], + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -1230,21 +1233,22 @@ partitions: waitForNewNode(t, context, node.NodeID, partition, 1000) } - // Request ask with 20 allocations - err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ - Asks: []*si.AllocationAsk{ - { - AllocationKey: "alloc-1", - ResourceAsk: &si.Resource{ - Resources: map[string]*si.Quantity{ - "memory": {Value: 10000000}, - "vcore": {Value: 1000}, - }, + // Request 20 allocations + asks := make([]*si.AllocationAsk, 20) + for i := 0; i < 20; i++ { + asks[i] = &si.AllocationAsk{ + AllocationKey: fmt.Sprintf("alloc-%d", i), + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, }, - MaxAllocations: 20, - ApplicationID: appID, }, - }, + ApplicationID: appID, + } + } + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: asks, RmID: "rm:123", }) @@ -1351,10 +1355,9 @@ func TestDupReleasesInGangScheduling(t *testing.T) { "vcore": {Value: 1000}, }, }, - TaskGroupName: "tg", - Placeholder: true, - MaxAllocations: 1, - ApplicationID: appID1, + TaskGroupName: "tg", + Placeholder: true, + ApplicationID: appID1, }, }, RmID: "rm:123", @@ -1386,10 +1389,9 @@ func TestDupReleasesInGangScheduling(t *testing.T) { "vcore": {Value: 1000}, }, }, - MaxAllocations: 1, - ApplicationID: appID1, - Placeholder: false, - TaskGroupName: "tg", + ApplicationID: appID1, + Placeholder: false, + TaskGroupName: "tg", }, }, RmID: "rm:123", @@ -1549,8 +1551,17 @@ partitions: "vcore": {Value: 1000}, }, }, - MaxAllocations: 2, - ApplicationID: appID1, + ApplicationID: appID1, + }, + { + AllocationKey: "alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10000000}, + "vcore": {Value: 1000}, + }, + }, + ApplicationID: appID1, }, }, RmID: "rm:123", diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index e37a9380..5736c524 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -497,44 +497,38 @@ func newApplicationTGTagsWithPhTimeout(appID, partition, queueName string, task } func newAllocationAskTG(allocKey, appID, taskGroup string, res *resources.Resource, placeHolder bool) *objects.AllocationAsk { - return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, 1, placeHolder) + return newAllocationAskAll(allocKey, appID, taskGroup, res, 1, placeHolder) } func newAllocationAsk(allocKey, appID string, res *resources.Resource) *objects.AllocationAsk { - return newAllocationAskRepeat(allocKey, appID, res, 1) + return newAllocationAskAll(allocKey, appID, "", res, 1, false) } -func newAllocationAskRepeat(allocKey, appID string, res *resources.Resource, repeat int32) *objects.AllocationAsk { - return newAllocationAskPriority(allocKey, appID, res, repeat, 1) +func newAllocationAskPriority(allocKey, appID string, res *resources.Resource, prio int32) *objects.AllocationAsk { + return newAllocationAskAll(allocKey, appID, "", res, prio, false) } -func newAllocationAskPriority(allocKey, appID string, res *resources.Resource, repeat int32, prio int32) *objects.AllocationAsk { - return newAllocationAskAll(allocKey, appID, "", res, repeat, prio, false) -} - -func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, repeat int32, prio int32, placeHolder bool) *objects.AllocationAsk { +func newAllocationAskAll(allocKey, appID, taskGroup string, res *resources.Resource, prio int32, placeHolder bool) *objects.AllocationAsk { return objects.NewAllocationAskFromSI(&si.AllocationAsk{ - AllocationKey: allocKey, - ApplicationID: appID, - PartitionName: "test", - ResourceAsk: res.ToProto(), - MaxAllocations: repeat, - Priority: prio, - TaskGroupName: taskGroup, - Placeholder: placeHolder, + AllocationKey: allocKey, + ApplicationID: appID, + PartitionName: "test", + ResourceAsk: res.ToProto(), + Priority: prio, + TaskGroupName: taskGroup, + Placeholder: placeHolder, }) } func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources.Resource) *objects.AllocationAsk { return objects.NewAllocationAskFromSI(&si.AllocationAsk{ - AllocationKey: allocKey, - ApplicationID: appID, - PartitionName: "default", - ResourceAsk: res.ToProto(), - MaxAllocations: 1, - Priority: prio, - TaskGroupName: taskGroup, - Placeholder: false, + AllocationKey: allocKey, + ApplicationID: appID, + PartitionName: "default", + ResourceAsk: res.ToProto(), + Priority: prio, + TaskGroupName: taskGroup, + Placeholder: false, PreemptionPolicy: &si.PreemptionPolicy{ AllowPreemptSelf: true, AllowPreemptOther: true, diff --git a/pkg/webservice/dao/allocation_ask_info.go b/pkg/webservice/dao/allocation_ask_info.go index 07fbd073..8097c5bc 100644 --- a/pkg/webservice/dao/allocation_ask_info.go +++ b/pkg/webservice/dao/allocation_ask_info.go @@ -29,7 +29,6 @@ type AllocationAskDAOInfo struct { AllocationTags map[string]string `json:"allocationTags,omitempty"` RequestTime int64 `json:"requestTime,omitempty"` ResourcePerAlloc map[string]int64 `json:"resource,omitempty"` - PendingCount int32 `json:"pendingCount,omitempty"` Priority string `json:"priority,omitempty"` RequiredNodeID string `json:"requiredNodeId,omitempty"` ApplicationID string `json:"applicationId,omitempty"` diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index 7da91622..cab7fb77 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -326,7 +326,6 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo { AllocationTags: ask.GetTagsClone(), RequestTime: ask.GetCreateTime().UnixNano(), ResourcePerAlloc: ask.GetAllocatedResource().DAOMap(), - PendingCount: ask.GetPendingAskRepeat(), Priority: strconv.Itoa(int(ask.GetPriority())), RequiredNodeID: ask.GetRequiredNode(), ApplicationID: ask.GetApplicationID(), @@ -345,7 +344,7 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo { func getAllocationAsksDAO(asks []*objects.AllocationAsk) []*dao.AllocationAskDAOInfo { asksDAO := make([]*dao.AllocationAskDAOInfo, 0, len(asks)) for _, ask := range asks { - if ask.GetPendingAskRepeat() > 0 { + if !ask.IsAllocated() { asksDAO = append(asksDAO, getAllocationAskDAO(ask)) } } diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go index 4d8ca9ed..6c908e85 100644 --- a/pkg/webservice/handlers_test.go +++ b/pkg/webservice/handlers_test.go @@ -1302,15 +1302,15 @@ func TestGetPartitionNodes(t *testing.T) { if node.NodeID == node1ID { assert.Equal(t, node.NodeID, node1ID) assert.Equal(t, "alloc-1", node.Allocations[0].AllocationKey) - assert.Equal(t, "alloc-1-0", node.Allocations[0].UUID) - assert.Equal(t, "alloc-1-0", node.Allocations[0].AllocationID) + assert.Equal(t, "alloc-1", node.Allocations[0].UUID) + assert.Equal(t, "alloc-1", node.Allocations[0].AllocationID) assert.DeepEqual(t, attributesOfnode1, node.Attributes) assert.DeepEqual(t, map[string]int64{"memory": 50, "vcore": 30}, node.Utilized) } else { assert.Equal(t, node.NodeID, node2ID) assert.Equal(t, "alloc-2", node.Allocations[0].AllocationKey) - assert.Equal(t, "alloc-2-0", node.Allocations[0].UUID) - assert.Equal(t, "alloc-2-0", node.Allocations[0].AllocationID) + assert.Equal(t, "alloc-2", node.Allocations[0].UUID) + assert.Equal(t, "alloc-2", node.Allocations[0].AllocationID) assert.DeepEqual(t, attributesOfnode2, node.Attributes) assert.DeepEqual(t, map[string]int64{"memory": 30, "vcore": 50}, node.Utilized) } @@ -1385,12 +1385,11 @@ func TestGetQueueApplicationsHandler(t *testing.T) { Resources: map[string]*si.Quantity{"vcore": {Value: 1}}, } ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{ - ApplicationID: "app-1", - PartitionName: part.Name, - TaskGroupName: tg, - ResourceAsk: res, - Placeholder: true, - MaxAllocations: 1}) + ApplicationID: "app-1", + PartitionName: part.Name, + TaskGroupName: tg, + ResourceAsk: res, + Placeholder: true}) err := app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") app.SetTimedOutPlaceholder(tg, 1) @@ -1565,10 +1564,9 @@ func TestGetApplicationHandler(t *testing.T) { Resources: map[string]*si.Quantity{"vcore": {Value: 1}}, } ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{ - ApplicationID: "app-1", - PartitionName: part.Name, - ResourceAsk: res, - MaxAllocations: 1}) + ApplicationID: "app-1", + PartitionName: part.Name, + ResourceAsk: res}) err := app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") @@ -2473,10 +2471,9 @@ func prepareUserAndGroupContext(t *testing.T, config string) { Resources: map[string]*si.Quantity{"vcore": {Value: 1}}, } ask := objects.NewAllocationAskFromSI(&si.AllocationAsk{ - ApplicationID: "app-1", - PartitionName: part.Name, - ResourceAsk: res, - MaxAllocations: 1}) + ApplicationID: "app-1", + PartitionName: part.Name, + ResourceAsk: res}) err := app.AddAllocationAsk(ask) assert.NilError(t, err, "ask should have been added to app") --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org