This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new aab9eb77 [YUNIKORN-2593] Remove partition from Allocation and 
AllocationAsk (#858)
aab9eb77 is described below

commit aab9eb772974b42679afdcf0749afc4528d56888
Author: Craig Condit <ccon...@apache.org>
AuthorDate: Wed May 8 11:29:34 2024 -0500

    [YUNIKORN-2593] Remove partition from Allocation and AllocationAsk (#858)
    
    The partition name is not required in Allocation and AllocationAsk, as it 
can
    be found in other contexts. Remove it to make the objects smaller.
    
    Closes: #858
---
 pkg/scheduler/context.go                  | 12 ++++++------
 pkg/scheduler/objects/allocation.go       |  8 --------
 pkg/scheduler/objects/allocation_ask.go   |  8 --------
 pkg/scheduler/objects/application.go      | 24 ++++++++++++------------
 pkg/scheduler/objects/preemption.go       |  2 +-
 pkg/scheduler/partition.go                |  2 +-
 pkg/scheduler/partition_test.go           |  6 +++---
 pkg/webservice/dao/allocation_ask_info.go |  1 -
 pkg/webservice/dao/allocation_info.go     |  1 -
 pkg/webservice/handlers.go                |  2 --
 10 files changed, 23 insertions(+), 43 deletions(-)

diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index 3d514e67..2ecda58e 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -143,7 +143,7 @@ func (cc *ClusterContext) schedule() bool {
                        
metrics.GetSchedulerMetrics().ObserveSchedulingLatency(schedulingStart)
                        if alloc.GetResult() == objects.Replaced {
                                // communicate the removal to the RM
-                               cc.notifyRMAllocationReleased(psc.RmID, 
alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, "replacing 
allocationKey: "+alloc.GetAllocationKey())
+                               cc.notifyRMAllocationReleased(psc.RmID, 
psc.Name, alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, 
"replacing allocationKey: "+alloc.GetAllocationKey())
                        } else {
                                cc.notifyRMNewAllocation(psc.RmID, alloc)
                        }
@@ -564,7 +564,7 @@ func (cc *ClusterContext) 
handleRMUpdateApplicationEvent(event *rmevent.RMUpdate
                        }
                        allocations := 
partition.removeApplication(app.ApplicationID)
                        if len(allocations) > 0 {
-                               cc.notifyRMAllocationReleased(partition.RmID, 
allocations, si.TerminationType_STOPPED_BY_RM,
+                               cc.notifyRMAllocationReleased(partition.RmID, 
partition.Name, allocations, si.TerminationType_STOPPED_BY_RM,
                                        fmt.Sprintf("Application %s Removed", 
app.ApplicationID))
                        }
                        log.Log(log.SchedContext).Info("Application removed 
from partition",
@@ -690,7 +690,7 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) 
{
                node.SendNodeRemovedEvent()
                // notify the shim allocations have been released from node
                if len(released) != 0 {
-                       cc.notifyRMAllocationReleased(partition.RmID, released, 
si.TerminationType_STOPPED_BY_RM,
+                       cc.notifyRMAllocationReleased(partition.RmID, 
partition.Name, released, si.TerminationType_STOPPED_BY_RM,
                                fmt.Sprintf("Node %s Removed", node.NodeID))
                }
                for _, confirm := range confirmed {
@@ -845,7 +845,7 @@ func (cc *ClusterContext) 
processAllocationReleases(releases []*si.AllocationRel
                        allocs, confirmed := 
partition.removeAllocation(toRelease)
                        // notify the RM of the exact released allocations
                        if len(allocs) > 0 {
-                               cc.notifyRMAllocationReleased(rmID, allocs, 
si.TerminationType_STOPPED_BY_RM, "allocation remove as per RM request")
+                               cc.notifyRMAllocationReleased(rmID, 
partition.Name, allocs, si.TerminationType_STOPPED_BY_RM, "allocation remove as 
per RM request")
                        }
                        // notify the RM of the confirmed allocations 
(placeholder swap & preemption)
                        if confirmed != nil {
@@ -887,7 +887,7 @@ func (cc *ClusterContext) notifyRMNewAllocation(rmID 
string, alloc *objects.Allo
 
 // Create a RM update event to notify RM of released allocations
 // Lock free call, all updates occur via events.
-func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, released 
[]*objects.Allocation, terminationType si.TerminationType, message string) {
+func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, 
partitionName string, released []*objects.Allocation, terminationType 
si.TerminationType, message string) {
        c := make(chan *rmevent.Result)
        releaseEvent := &rmevent.RMReleaseAllocationEvent{
                ReleasedAllocations: make([]*si.AllocationRelease, 0),
@@ -897,7 +897,7 @@ func (cc *ClusterContext) notifyRMAllocationReleased(rmID 
string, released []*ob
        for _, alloc := range released {
                releaseEvent.ReleasedAllocations = 
append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{
                        ApplicationID:   alloc.GetApplicationID(),
-                       PartitionName:   alloc.GetPartitionName(),
+                       PartitionName:   partitionName,
                        TerminationType: terminationType,
                        Message:         message,
                        AllocationKey:   alloc.GetAllocationKey(),
diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index dd4c1f99..fff75116 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -25,7 +25,6 @@ import (
 
        "go.uber.org/zap"
 
-       "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/locking"
        "github.com/apache/yunikorn-core/pkg/log"
@@ -90,7 +89,6 @@ func NewAllocation(nodeID string, ask *AllocationAsk) 
*Allocation {
                createTime:        createTime,
                bindTime:          time.Now(),
                nodeID:            nodeID,
-               partitionName:     
common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
                tags:              ask.GetTagsClone(),
                priority:          ask.GetPriority(),
                allocatedResource: ask.GetAllocatedResource().Clone(),
@@ -139,7 +137,6 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
        ask := &AllocationAsk{
                allocationKey:     alloc.AllocationKey,
                applicationID:     alloc.ApplicationID,
-               partitionName:     alloc.PartitionName,
                allocatedResource: 
resources.NewResourceFromProto(alloc.ResourcePerAlloc),
                tags:              CloneAllocationTags(alloc.AllocationTags),
                priority:          alloc.Priority,
@@ -204,11 +201,6 @@ func (a *Allocation) GetApplicationID() string {
        return a.applicationID
 }
 
-// GetPartitionName returns the partition name for this allocation
-func (a *Allocation) GetPartitionName() string {
-       return a.partitionName
-}
-
 // GetTaskGroup returns the task group name for this allocation
 func (a *Allocation) GetTaskGroup() string {
        return a.taskGroupName
diff --git a/pkg/scheduler/objects/allocation_ask.go 
b/pkg/scheduler/objects/allocation_ask.go
index fc39a770..4701beae 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -36,7 +36,6 @@ type AllocationAsk struct {
        // Read-only fields
        allocationKey     string
        applicationID     string
-       partitionName     string
        taskGroupName     string    // task group this allocation ask belongs to
        placeholder       bool      // is this a placeholder allocation ask
        createTime        time.Time // the time this ask was created (used in 
reservations)
@@ -89,8 +88,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk) 
*AllocationAsk {
                allocationKey:     ask.AllocationKey,
                allocatedResource: 
resources.NewResourceFromProto(ask.ResourceAsk),
                applicationID:     ask.ApplicationID,
-               partitionName:     ask.PartitionName,
-
                tags:              CloneAllocationTags(ask.Tags),
                createTime:        time.Now(),
                priority:          ask.Priority,
@@ -132,11 +129,6 @@ func (aa *AllocationAsk) GetApplicationID() string {
        return aa.applicationID
 }
 
-// GetPartitionName returns the partition name for this ask
-func (aa *AllocationAsk) GetPartitionName() string {
-       return aa.partitionName
-}
-
 // allocate marks the ask as allocated and returns true if successful. An ask 
may not be allocated multiple times.
 func (aa *AllocationAsk) allocate() bool {
        aa.Lock()
diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 8b280891..5ef35dea 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -356,7 +356,7 @@ func (sa *Application) timeoutStateTimer(expectedState 
string, event application
                                        alloc.SetReleased(true)
                                        toRelease = append(toRelease, alloc)
                                }
-                               sa.notifyRMAllocationReleased(sa.rmID, 
toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
+                               sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
                                sa.clearStateTimer()
                        } else {
                                // nolint: errcheck
@@ -430,7 +430,7 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                        zap.String("AppID", sa.ApplicationID),
                        zap.Int("placeholders being replaced", replacing),
                        zap.Int("releasing placeholders", len(toRelease)))
-               sa.notifyRMAllocationReleased(sa.rmID, toRelease, 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
+               sa.notifyRMAllocationReleased(toRelease, 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
        // Case 2: in every other case fail the application, and notify the 
context about the expired placeholder asks
        default:
                log.Log(log.SchedApplication).Info("Placeholder timeout, 
releasing asks and placeholders",
@@ -449,10 +449,10 @@ func (sa *Application) timeoutPlaceholderProcessing() {
                                zap.String("currentState", sa.CurrentState()),
                                zap.Error(err))
                }
-               sa.notifyRMAllocationAskReleased(sa.rmID, 
sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing 
placeholders asks on placeholder timeout")
+               sa.notifyRMAllocationAskReleased(sa.getAllRequestsInternal(), 
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder 
timeout")
                sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
                // all allocations are placeholders but GetAllAllocations is 
locked and cannot be used
-               sa.notifyRMAllocationReleased(sa.rmID, 
sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing 
allocated placeholders on placeholder timeout")
+               sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(), 
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder 
timeout")
                // we are in an accepted or new state so nothing can be 
replaced yet: mark everything as timedout
                for _, phData := range sa.placeholderData {
                        phData.TimedOut = phData.Count
@@ -1144,7 +1144,7 @@ func (sa *Application) 
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
                                        zap.Stringer("placeholder resource", 
ph.GetAllocatedResource()))
                                // release the placeholder and tell the RM
                                ph.SetReleased(true)
-                               sa.notifyRMAllocationReleased(sa.rmID, 
[]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource 
incompatible")
+                               
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, 
"cancel placeholder: resource incompatible")
                                sa.appEvents.sendPlaceholderLargerEvent(ph, 
request)
                                continue
                        }
@@ -1347,7 +1347,7 @@ func (sa *Application) tryRequiredNodePreemption(reserve 
*reservation, ask *Allo
                        victim.MarkPreempted()
                }
                ask.MarkTriggeredPreemption()
-               sa.notifyRMAllocationReleased(sa.rmID, victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+               sa.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
                        "preempting allocations to free up resources to run 
daemon set ask: "+ask.GetAllocationKey())
                return true
        }
@@ -1943,7 +1943,7 @@ func (sa *Application) executeTerminatedCallback() {
 // notifyRMAllocationReleased send an allocation release event to the RM to if 
the event handler is configured
 // and at least one allocation has been released.
 // No locking must be called while holding the lock
-func (sa *Application) notifyRMAllocationReleased(rmID string, released 
[]*Allocation, terminationType si.TerminationType, message string) {
+func (sa *Application) notifyRMAllocationReleased(released []*Allocation, 
terminationType si.TerminationType, message string) {
        // only generate event if needed
        if len(released) == 0 || sa.rmEventHandler == nil {
                return
@@ -1951,13 +1951,13 @@ func (sa *Application) notifyRMAllocationReleased(rmID 
string, released []*Alloc
        c := make(chan *rmevent.Result)
        releaseEvent := &rmevent.RMReleaseAllocationEvent{
                ReleasedAllocations: make([]*si.AllocationRelease, 0),
-               RmID:                rmID,
+               RmID:                sa.rmID,
                Channel:             c,
        }
        for _, alloc := range released {
                releaseEvent.ReleasedAllocations = 
append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{
                        ApplicationID:   alloc.GetApplicationID(),
-                       PartitionName:   alloc.GetPartitionName(),
+                       PartitionName:   sa.Partition,
                        AllocationKey:   alloc.GetAllocationKey(),
                        TerminationType: terminationType,
                        Message:         message,
@@ -1976,19 +1976,19 @@ func (sa *Application) notifyRMAllocationReleased(rmID 
string, released []*Alloc
 // notifyRMAllocationAskReleased send an ask release event to the RM to if the 
event handler is configured
 // and at least one ask has been released.
 // No locking must be called while holding the lock
-func (sa *Application) notifyRMAllocationAskReleased(rmID string, released 
[]*AllocationAsk, terminationType si.TerminationType, message string) {
+func (sa *Application) notifyRMAllocationAskReleased(released 
[]*AllocationAsk, terminationType si.TerminationType, message string) {
        // only generate event if needed
        if len(released) == 0 || sa.rmEventHandler == nil {
                return
        }
        releaseEvent := &rmevent.RMReleaseAllocationAskEvent{
                ReleasedAllocationAsks: make([]*si.AllocationAskRelease, 0),
-               RmID:                   rmID,
+               RmID:                   sa.rmID,
        }
        for _, alloc := range released {
                releaseEvent.ReleasedAllocationAsks = 
append(releaseEvent.ReleasedAllocationAsks, &si.AllocationAskRelease{
                        ApplicationID:   alloc.GetApplicationID(),
-                       PartitionName:   alloc.GetPartitionName(),
+                       PartitionName:   
common.GetPartitionNameWithoutClusterID(sa.Partition),
                        AllocationKey:   alloc.GetAllocationKey(),
                        TerminationType: terminationType,
                        Message:         message,
diff --git a/pkg/scheduler/objects/preemption.go 
b/pkg/scheduler/objects/preemption.go
index efac8fa1..e19929ed 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -569,7 +569,7 @@ func (p *Preemptor) TryPreemption() (*Allocation, bool) {
        p.ask.MarkTriggeredPreemption()
 
        // notify RM that victims should be released
-       p.application.notifyRMAllocationReleased(p.application.rmID, victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+       p.application.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
                "preempting allocations to free up resources to run ask: 
"+p.ask.GetAllocationKey())
 
        // reserve the selected node for the new allocation if it will fit
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index b36bfd5c..f2154136 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -46,7 +46,7 @@ import (
 
 type PartitionContext struct {
        RmID string // the RM the partition belongs to
-       Name string // name of the partition (logging mainly)
+       Name string // name of the partition
 
        // Private fields need protection
        root                   *objects.Queue                  // start of the 
queue hierarchy
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index dffae4e6..eabb1ad4 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2948,7 +2948,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
 
        // release placeholder: do what the context would do after the shim 
processing
        release := &si.AllocationRelease{
-               PartitionName:   ph.GetPartitionName(),
+               PartitionName:   partition.Name,
                ApplicationID:   appID1,
                AllocationKey:   ph.GetAllocationKey(),
                TerminationType: si.TerminationType_TIMEOUT,
@@ -3020,7 +3020,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
        for id, ph := range phs {
                assert.Assert(t, ph.IsReleased(), "placeholder %s should be 
released", id)
                release := &si.AllocationRelease{
-                       PartitionName:   ph.GetPartitionName(),
+                       PartitionName:   partition.Name,
                        ApplicationID:   appID1,
                        AllocationKey:   ph.GetAllocationKey(),
                        TerminationType: si.TerminationType_TIMEOUT,
@@ -3087,7 +3087,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) {
 
        // replace the placeholder: do what the context would do after the shim 
processing
        release := &si.AllocationRelease{
-               PartitionName:   ph.GetPartitionName(),
+               PartitionName:   partition.Name,
                ApplicationID:   appID1,
                AllocationKey:   ph.GetAllocationKey(),
                TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
diff --git a/pkg/webservice/dao/allocation_ask_info.go 
b/pkg/webservice/dao/allocation_ask_info.go
index 5f865b6d..8516047e 100644
--- a/pkg/webservice/dao/allocation_ask_info.go
+++ b/pkg/webservice/dao/allocation_ask_info.go
@@ -32,7 +32,6 @@ type AllocationAskDAOInfo struct {
        Priority            string                     
`json:"priority,omitempty"`
        RequiredNodeID      string                     
`json:"requiredNodeId,omitempty"`
        ApplicationID       string                     
`json:"applicationId,omitempty"`
-       Partition           string                     
`json:"partition,omitempty"`
        Placeholder         bool                       
`json:"placeholder,omitempty"`
        TaskGroupName       string                     
`json:"taskGroupName,omitempty"`
        AllocationLog       []*AllocationAskLogDAOInfo 
`json:"allocationLog,omitempty"`
diff --git a/pkg/webservice/dao/allocation_info.go 
b/pkg/webservice/dao/allocation_info.go
index 8c9ea2ff..aec6b994 100644
--- a/pkg/webservice/dao/allocation_info.go
+++ b/pkg/webservice/dao/allocation_info.go
@@ -28,7 +28,6 @@ type AllocationDAOInfo struct {
        Priority         string            `json:"priority,omitempty"`
        NodeID           string            `json:"nodeId,omitempty"`
        ApplicationID    string            `json:"applicationId,omitempty"`
-       Partition        string            `json:"partition,omitempty"`
        Placeholder      bool              `json:"placeholder,omitempty"`
        PlaceholderUsed  bool              `json:"placeholderUsed,omitempty"`
        TaskGroupName    string            `json:"taskGroupName,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 04c70c50..5ccc31a5 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -226,7 +226,6 @@ func getAllocationDAO(alloc *objects.Allocation) 
*dao.AllocationDAOInfo {
                Priority:         strconv.Itoa(int(alloc.GetPriority())),
                NodeID:           alloc.GetNodeID(),
                ApplicationID:    alloc.GetApplicationID(),
-               Partition:        alloc.GetPartitionName(),
                Preempted:        alloc.IsPreempted(),
        }
        return allocDAO
@@ -327,7 +326,6 @@ func getAllocationAskDAO(ask *objects.AllocationAsk) 
*dao.AllocationAskDAOInfo {
                Priority:            strconv.Itoa(int(ask.GetPriority())),
                RequiredNodeID:      ask.GetRequiredNode(),
                ApplicationID:       ask.GetApplicationID(),
-               Partition:           
common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
                Placeholder:         ask.IsPlaceholder(),
                TaskGroupName:       ask.GetTaskGroup(),
                AllocationLog:       
getAllocationLogsDAO(ask.GetAllocationLog()),


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to