This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push: new aab9eb77 [YUNIKORN-2593] Remove partition from Allocation and AllocationAsk (#858) aab9eb77 is described below commit aab9eb772974b42679afdcf0749afc4528d56888 Author: Craig Condit <ccon...@apache.org> AuthorDate: Wed May 8 11:29:34 2024 -0500 [YUNIKORN-2593] Remove partition from Allocation and AllocationAsk (#858) The partition name is not required in Allocation and AllocationAsk, as it can be found in other contexts. Remove it to make the objects smaller. Closes: #858 --- pkg/scheduler/context.go | 12 ++++++------ pkg/scheduler/objects/allocation.go | 8 -------- pkg/scheduler/objects/allocation_ask.go | 8 -------- pkg/scheduler/objects/application.go | 24 ++++++++++++------------ pkg/scheduler/objects/preemption.go | 2 +- pkg/scheduler/partition.go | 2 +- pkg/scheduler/partition_test.go | 6 +++--- pkg/webservice/dao/allocation_ask_info.go | 1 - pkg/webservice/dao/allocation_info.go | 1 - pkg/webservice/handlers.go | 2 -- 10 files changed, 23 insertions(+), 43 deletions(-) diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index 3d514e67..2ecda58e 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -143,7 +143,7 @@ func (cc *ClusterContext) schedule() bool { metrics.GetSchedulerMetrics().ObserveSchedulingLatency(schedulingStart) if alloc.GetResult() == objects.Replaced { // communicate the removal to the RM - cc.notifyRMAllocationReleased(psc.RmID, alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, "replacing allocationKey: "+alloc.GetAllocationKey()) + cc.notifyRMAllocationReleased(psc.RmID, psc.Name, alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, "replacing allocationKey: "+alloc.GetAllocationKey()) } else { cc.notifyRMNewAllocation(psc.RmID, alloc) } @@ -564,7 +564,7 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate } allocations := partition.removeApplication(app.ApplicationID) if len(allocations) > 0 { - cc.notifyRMAllocationReleased(partition.RmID, allocations, si.TerminationType_STOPPED_BY_RM, + cc.notifyRMAllocationReleased(partition.RmID, partition.Name, allocations, si.TerminationType_STOPPED_BY_RM, fmt.Sprintf("Application %s Removed", app.ApplicationID)) } log.Log(log.SchedContext).Info("Application removed from partition", @@ -690,7 +690,7 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) { node.SendNodeRemovedEvent() // notify the shim allocations have been released from node if len(released) != 0 { - cc.notifyRMAllocationReleased(partition.RmID, released, si.TerminationType_STOPPED_BY_RM, + cc.notifyRMAllocationReleased(partition.RmID, partition.Name, released, si.TerminationType_STOPPED_BY_RM, fmt.Sprintf("Node %s Removed", node.NodeID)) } for _, confirm := range confirmed { @@ -845,7 +845,7 @@ func (cc *ClusterContext) processAllocationReleases(releases []*si.AllocationRel allocs, confirmed := partition.removeAllocation(toRelease) // notify the RM of the exact released allocations if len(allocs) > 0 { - cc.notifyRMAllocationReleased(rmID, allocs, si.TerminationType_STOPPED_BY_RM, "allocation remove as per RM request") + cc.notifyRMAllocationReleased(rmID, partition.Name, allocs, si.TerminationType_STOPPED_BY_RM, "allocation remove as per RM request") } // notify the RM of the confirmed allocations (placeholder swap & preemption) if confirmed != nil { @@ -887,7 +887,7 @@ func (cc *ClusterContext) notifyRMNewAllocation(rmID string, alloc *objects.Allo // Create a RM update event to notify RM of released allocations // Lock free call, all updates occur via events. -func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, released []*objects.Allocation, terminationType si.TerminationType, message string) { +func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, partitionName string, released []*objects.Allocation, terminationType si.TerminationType, message string) { c := make(chan *rmevent.Result) releaseEvent := &rmevent.RMReleaseAllocationEvent{ ReleasedAllocations: make([]*si.AllocationRelease, 0), @@ -897,7 +897,7 @@ func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, released []*ob for _, alloc := range released { releaseEvent.ReleasedAllocations = append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{ ApplicationID: alloc.GetApplicationID(), - PartitionName: alloc.GetPartitionName(), + PartitionName: partitionName, TerminationType: terminationType, Message: message, AllocationKey: alloc.GetAllocationKey(), diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index dd4c1f99..fff75116 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -25,7 +25,6 @@ import ( "go.uber.org/zap" - "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/locking" "github.com/apache/yunikorn-core/pkg/log" @@ -90,7 +89,6 @@ func NewAllocation(nodeID string, ask *AllocationAsk) *Allocation { createTime: createTime, bindTime: time.Now(), nodeID: nodeID, - partitionName: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()), tags: ask.GetTagsClone(), priority: ask.GetPriority(), allocatedResource: ask.GetAllocatedResource().Clone(), @@ -139,7 +137,6 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation { ask := &AllocationAsk{ allocationKey: alloc.AllocationKey, applicationID: alloc.ApplicationID, - partitionName: alloc.PartitionName, allocatedResource: resources.NewResourceFromProto(alloc.ResourcePerAlloc), tags: CloneAllocationTags(alloc.AllocationTags), priority: alloc.Priority, @@ -204,11 +201,6 @@ func (a *Allocation) GetApplicationID() string { return a.applicationID } -// GetPartitionName returns the partition name for this allocation -func (a *Allocation) GetPartitionName() string { - return a.partitionName -} - // GetTaskGroup returns the task group name for this allocation func (a *Allocation) GetTaskGroup() string { return a.taskGroupName diff --git a/pkg/scheduler/objects/allocation_ask.go b/pkg/scheduler/objects/allocation_ask.go index fc39a770..4701beae 100644 --- a/pkg/scheduler/objects/allocation_ask.go +++ b/pkg/scheduler/objects/allocation_ask.go @@ -36,7 +36,6 @@ type AllocationAsk struct { // Read-only fields allocationKey string applicationID string - partitionName string taskGroupName string // task group this allocation ask belongs to placeholder bool // is this a placeholder allocation ask createTime time.Time // the time this ask was created (used in reservations) @@ -89,8 +88,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) *AllocationAsk { allocationKey: ask.AllocationKey, allocatedResource: resources.NewResourceFromProto(ask.ResourceAsk), applicationID: ask.ApplicationID, - partitionName: ask.PartitionName, - tags: CloneAllocationTags(ask.Tags), createTime: time.Now(), priority: ask.Priority, @@ -132,11 +129,6 @@ func (aa *AllocationAsk) GetApplicationID() string { return aa.applicationID } -// GetPartitionName returns the partition name for this ask -func (aa *AllocationAsk) GetPartitionName() string { - return aa.partitionName -} - // allocate marks the ask as allocated and returns true if successful. An ask may not be allocated multiple times. func (aa *AllocationAsk) allocate() bool { aa.Lock() diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 8b280891..5ef35dea 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -356,7 +356,7 @@ func (sa *Application) timeoutStateTimer(expectedState string, event application alloc.SetReleased(true) toRelease = append(toRelease, alloc) } - sa.notifyRMAllocationReleased(sa.rmID, toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete") + sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete") sa.clearStateTimer() } else { // nolint: errcheck @@ -430,7 +430,7 @@ func (sa *Application) timeoutPlaceholderProcessing() { zap.String("AppID", sa.ApplicationID), zap.Int("placeholders being replaced", replacing), zap.Int("releasing placeholders", len(toRelease))) - sa.notifyRMAllocationReleased(sa.rmID, toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") + sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") // Case 2: in every other case fail the application, and notify the context about the expired placeholder asks default: log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders", @@ -449,10 +449,10 @@ func (sa *Application) timeoutPlaceholderProcessing() { zap.String("currentState", sa.CurrentState()), zap.Error(err)) } - sa.notifyRMAllocationAskReleased(sa.rmID, sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout") + sa.notifyRMAllocationAskReleased(sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout") sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT) // all allocations are placeholders but GetAllAllocations is locked and cannot be used - sa.notifyRMAllocationReleased(sa.rmID, sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") + sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout") // we are in an accepted or new state so nothing can be replaced yet: mark everything as timedout for _, phData := range sa.placeholderData { phData.TimedOut = phData.Count @@ -1144,7 +1144,7 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, zap.Stringer("placeholder resource", ph.GetAllocatedResource())) // release the placeholder and tell the RM ph.SetReleased(true) - sa.notifyRMAllocationReleased(sa.rmID, []*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible") + sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible") sa.appEvents.sendPlaceholderLargerEvent(ph, request) continue } @@ -1347,7 +1347,7 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo victim.MarkPreempted() } ask.MarkTriggeredPreemption() - sa.notifyRMAllocationReleased(sa.rmID, victims, si.TerminationType_PREEMPTED_BY_SCHEDULER, + sa.notifyRMAllocationReleased(victims, si.TerminationType_PREEMPTED_BY_SCHEDULER, "preempting allocations to free up resources to run daemon set ask: "+ask.GetAllocationKey()) return true } @@ -1943,7 +1943,7 @@ func (sa *Application) executeTerminatedCallback() { // notifyRMAllocationReleased send an allocation release event to the RM to if the event handler is configured // and at least one allocation has been released. // No locking must be called while holding the lock -func (sa *Application) notifyRMAllocationReleased(rmID string, released []*Allocation, terminationType si.TerminationType, message string) { +func (sa *Application) notifyRMAllocationReleased(released []*Allocation, terminationType si.TerminationType, message string) { // only generate event if needed if len(released) == 0 || sa.rmEventHandler == nil { return @@ -1951,13 +1951,13 @@ func (sa *Application) notifyRMAllocationReleased(rmID string, released []*Alloc c := make(chan *rmevent.Result) releaseEvent := &rmevent.RMReleaseAllocationEvent{ ReleasedAllocations: make([]*si.AllocationRelease, 0), - RmID: rmID, + RmID: sa.rmID, Channel: c, } for _, alloc := range released { releaseEvent.ReleasedAllocations = append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{ ApplicationID: alloc.GetApplicationID(), - PartitionName: alloc.GetPartitionName(), + PartitionName: sa.Partition, AllocationKey: alloc.GetAllocationKey(), TerminationType: terminationType, Message: message, @@ -1976,19 +1976,19 @@ func (sa *Application) notifyRMAllocationReleased(rmID string, released []*Alloc // notifyRMAllocationAskReleased send an ask release event to the RM to if the event handler is configured // and at least one ask has been released. // No locking must be called while holding the lock -func (sa *Application) notifyRMAllocationAskReleased(rmID string, released []*AllocationAsk, terminationType si.TerminationType, message string) { +func (sa *Application) notifyRMAllocationAskReleased(released []*AllocationAsk, terminationType si.TerminationType, message string) { // only generate event if needed if len(released) == 0 || sa.rmEventHandler == nil { return } releaseEvent := &rmevent.RMReleaseAllocationAskEvent{ ReleasedAllocationAsks: make([]*si.AllocationAskRelease, 0), - RmID: rmID, + RmID: sa.rmID, } for _, alloc := range released { releaseEvent.ReleasedAllocationAsks = append(releaseEvent.ReleasedAllocationAsks, &si.AllocationAskRelease{ ApplicationID: alloc.GetApplicationID(), - PartitionName: alloc.GetPartitionName(), + PartitionName: common.GetPartitionNameWithoutClusterID(sa.Partition), AllocationKey: alloc.GetAllocationKey(), TerminationType: terminationType, Message: message, diff --git a/pkg/scheduler/objects/preemption.go b/pkg/scheduler/objects/preemption.go index efac8fa1..e19929ed 100644 --- a/pkg/scheduler/objects/preemption.go +++ b/pkg/scheduler/objects/preemption.go @@ -569,7 +569,7 @@ func (p *Preemptor) TryPreemption() (*Allocation, bool) { p.ask.MarkTriggeredPreemption() // notify RM that victims should be released - p.application.notifyRMAllocationReleased(p.application.rmID, victims, si.TerminationType_PREEMPTED_BY_SCHEDULER, + p.application.notifyRMAllocationReleased(victims, si.TerminationType_PREEMPTED_BY_SCHEDULER, "preempting allocations to free up resources to run ask: "+p.ask.GetAllocationKey()) // reserve the selected node for the new allocation if it will fit diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index b36bfd5c..f2154136 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -46,7 +46,7 @@ import ( type PartitionContext struct { RmID string // the RM the partition belongs to - Name string // name of the partition (logging mainly) + Name string // name of the partition // Private fields need protection root *objects.Queue // start of the queue hierarchy diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index dffae4e6..eabb1ad4 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -2948,7 +2948,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) { // release placeholder: do what the context would do after the shim processing release := &si.AllocationRelease{ - PartitionName: ph.GetPartitionName(), + PartitionName: partition.Name, ApplicationID: appID1, AllocationKey: ph.GetAllocationKey(), TerminationType: si.TerminationType_TIMEOUT, @@ -3020,7 +3020,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) { for id, ph := range phs { assert.Assert(t, ph.IsReleased(), "placeholder %s should be released", id) release := &si.AllocationRelease{ - PartitionName: ph.GetPartitionName(), + PartitionName: partition.Name, ApplicationID: appID1, AllocationKey: ph.GetAllocationKey(), TerminationType: si.TerminationType_TIMEOUT, @@ -3087,7 +3087,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) { // replace the placeholder: do what the context would do after the shim processing release := &si.AllocationRelease{ - PartitionName: ph.GetPartitionName(), + PartitionName: partition.Name, ApplicationID: appID1, AllocationKey: ph.GetAllocationKey(), TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, diff --git a/pkg/webservice/dao/allocation_ask_info.go b/pkg/webservice/dao/allocation_ask_info.go index 5f865b6d..8516047e 100644 --- a/pkg/webservice/dao/allocation_ask_info.go +++ b/pkg/webservice/dao/allocation_ask_info.go @@ -32,7 +32,6 @@ type AllocationAskDAOInfo struct { Priority string `json:"priority,omitempty"` RequiredNodeID string `json:"requiredNodeId,omitempty"` ApplicationID string `json:"applicationId,omitempty"` - Partition string `json:"partition,omitempty"` Placeholder bool `json:"placeholder,omitempty"` TaskGroupName string `json:"taskGroupName,omitempty"` AllocationLog []*AllocationAskLogDAOInfo `json:"allocationLog,omitempty"` diff --git a/pkg/webservice/dao/allocation_info.go b/pkg/webservice/dao/allocation_info.go index 8c9ea2ff..aec6b994 100644 --- a/pkg/webservice/dao/allocation_info.go +++ b/pkg/webservice/dao/allocation_info.go @@ -28,7 +28,6 @@ type AllocationDAOInfo struct { Priority string `json:"priority,omitempty"` NodeID string `json:"nodeId,omitempty"` ApplicationID string `json:"applicationId,omitempty"` - Partition string `json:"partition,omitempty"` Placeholder bool `json:"placeholder,omitempty"` PlaceholderUsed bool `json:"placeholderUsed,omitempty"` TaskGroupName string `json:"taskGroupName,omitempty"` diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index 04c70c50..5ccc31a5 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -226,7 +226,6 @@ func getAllocationDAO(alloc *objects.Allocation) *dao.AllocationDAOInfo { Priority: strconv.Itoa(int(alloc.GetPriority())), NodeID: alloc.GetNodeID(), ApplicationID: alloc.GetApplicationID(), - Partition: alloc.GetPartitionName(), Preempted: alloc.IsPreempted(), } return allocDAO @@ -327,7 +326,6 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) *dao.AllocationAskDAOInfo { Priority: strconv.Itoa(int(ask.GetPriority())), RequiredNodeID: ask.GetRequiredNode(), ApplicationID: ask.GetApplicationID(), - Partition: common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()), Placeholder: ask.IsPlaceholder(), TaskGroupName: ask.GetTaskGroup(), AllocationLog: getAllocationLogsDAO(ask.GetAllocationLog()), --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org