This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 982bc48f [YUNIKORN-2562] Nil pointer panic in 
Application.ReplaceAllocation() (#846)
982bc48f is described below

commit 982bc48f890bc786e2edb450b67f6952e68a98f3
Author: Peter Bacsko <pbac...@cloudera.com>
AuthorDate: Mon Apr 22 11:19:40 2024 +0200

    [YUNIKORN-2562] Nil pointer panic in Application.ReplaceAllocation() (#846)
    
    Closes: #846
    
    Signed-off-by: Peter Bacsko <pbac...@cloudera.com>
---
 pkg/scheduler/objects/application.go         |  35 +++---
 pkg/scheduler/objects/application_test.go    |  13 +++
 pkg/scheduler/partition_test.go              |  66 +++++++++++
 pkg/scheduler/tests/mock_rm_callback_test.go |  31 +++++
 pkg/scheduler/tests/recovery_test.go         | 165 +++++++++++++++++++++++++++
 5 files changed, 291 insertions(+), 19 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 74f7c5dc..e6940680 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -638,25 +638,13 @@ func (sa *Application) AddAllocationAsk(ask 
*AllocationAsk) error {
                                zap.Error(err))
                }
        }
-       sa.requests[ask.GetAllocationKey()] = ask
-
-       // update app priority
-       repeat := ask.GetPendingAskRepeat()
-       priority := ask.GetPriority()
-       if repeat > 0 && priority > sa.askMaxPriority {
-               sa.askMaxPriority = priority
-               sa.queue.UpdateApplicationPriority(sa.ApplicationID, 
sa.askMaxPriority)
-       }
+       sa.addAllocationAskInternal(ask)
 
        // Update total pending resource
        delta.SubFrom(oldAskResource)
        sa.pending = resources.Add(sa.pending, delta)
        sa.queue.incPendingResource(delta)
 
-       if ask.IsPlaceholder() {
-               sa.addPlaceholderData(ask)
-       }
-
        log.Log(log.SchedApplication).Info("ask added successfully to 
application",
                zap.String("appID", sa.ApplicationID),
                zap.String("user", sa.user.User),
@@ -678,6 +666,19 @@ func (sa *Application) RecoverAllocationAsk(ask 
*AllocationAsk) {
        if ask == nil {
                return
        }
+
+       sa.addAllocationAskInternal(ask)
+
+       // progress the application from New to Accepted.
+       if sa.IsNew() {
+               if err := sa.HandleApplicationEvent(RunApplication); err != nil 
{
+                       log.Log(log.SchedApplication).Debug("Application state 
change failed while recovering allocation ask",
+                               zap.Error(err))
+               }
+       }
+}
+
+func (sa *Application) addAllocationAskInternal(ask *AllocationAsk) {
        sa.requests[ask.GetAllocationKey()] = ask
 
        // update app priority
@@ -688,12 +689,8 @@ func (sa *Application) RecoverAllocationAsk(ask 
*AllocationAsk) {
                sa.queue.UpdateApplicationPriority(sa.ApplicationID, 
sa.askMaxPriority)
        }
 
-       // progress the application from New to Accepted.
-       if sa.IsNew() {
-               if err := sa.HandleApplicationEvent(RunApplication); err != nil 
{
-                       log.Log(log.SchedApplication).Debug("Application state 
change failed while recovering allocation ask",
-                               zap.Error(err))
-               }
+       if ask.IsPlaceholder() {
+               sa.addPlaceholderData(ask)
        }
 }
 
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index ad0dc58c..2300d729 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -524,6 +524,19 @@ func TestRecoverAllocAsk(t *testing.T) {
        assert.Equal(t, len(app.requests), 2, "ask should have been added, 
total should be 2")
        assert.Assert(t, app.IsAccepted(), "Application should have stayed in 
accepted state")
        assertUserGroupResource(t, getTestUserGroup(), nil)
+
+       assert.Equal(t, 0, len(app.placeholderData))
+       ask = newAllocationAskTG("ask-3", appID1, "testGroup", res, 1)
+       app.RecoverAllocationAsk(ask)
+       phData := app.placeholderData
+       assert.Equal(t, 1, len(phData))
+       taskGroupData := phData["testGroup"]
+       assert.Assert(t, taskGroupData != nil)
+       assert.Equal(t, "testGroup", taskGroupData.TaskGroupName)
+       assert.Equal(t, int64(1), taskGroupData.Count)
+       assert.Equal(t, int64(0), taskGroupData.Replaced)
+       assert.Equal(t, int64(0), taskGroupData.TimedOut)
+       assert.Assert(t, resources.Equals(taskGroupData.MinResource, res))
 }
 
 // test reservations removal by allocation
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 3baa40cc..7b4d0583 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -4423,3 +4423,69 @@ func TestCalculateOutstandingRequests(t *testing.T) {
        assert.Equal(t, 3, len(requests))
        assert.Assert(t, resources.Equals(expectedTotal, total), "total 
resource expected: %v, got: %v", expectedTotal, total)
 }
+
+func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
+       // verify the following (YUNIKORN-2562):
+       // 1. Have a recovered, existing PH allocation (ph-1) from a node with 
task group "tg-1"
+       // 2. Have a new PH ask (ph-2) with task group "tg-2"
+       // 3. Have a real ask with task group "tg-1"
+       // 4. EXPECTED: successful allocation for the pending ask (ph-2)
+       // 5. EXPECTED: successful placeholder allocation (replacement)
+       // 6. EXPECTED: successful removal of ph-1 allocation
+       setupUGM()
+       partition, err := newBasePartition()
+       assert.NilError(t, err, "partition create failed")
+
+       // add a new app
+       app := newApplication(appID1, "default", defQueue)
+       err = partition.AddApplication(app)
+       assert.NilError(t, err, "add application to partition should not have 
failed")
+
+       // add a node with allocation
+       nodeRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       node1 := newNodeMaxResource(nodeID1, nodeRes)
+       appRes := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes, 
true)
+       ph := objects.NewAllocation(nodeID1, phAsk)
+       allocs := []*objects.Allocation{ph}
+       err = partition.AddNode(node1, allocs)
+       assert.NilError(t, err)
+
+       // add a placeholder ask with a different taskgroup
+       phAsk2 := newAllocationAskTG("placeholder2", appID1, "tg-2", appRes, 
true)
+       err = app.AddAllocationAsk(phAsk2)
+       assert.NilError(t, err, "failed to add placeholder ask")
+
+       realAsk := newAllocationAskTG("real-alloc", appID1, taskGroup, appRes, 
false)
+       err = app.AddAllocationAsk(realAsk)
+       assert.NilError(t, err, "failed to add real ask")
+
+       // get an allocation for "placeholder2"
+       alloc := partition.tryAllocate()
+       assert.Assert(t, alloc != nil, "no allocation occurred")
+       assert.Equal(t, objects.Allocated, alloc.GetResult())
+       assert.Equal(t, "placeholder2", alloc.GetAllocationKey())
+       assert.Equal(t, "tg-2", alloc.GetTaskGroup())
+       assert.Equal(t, "node-1", alloc.GetNodeID())
+
+       // real allocation gets replaced
+       alloc = partition.tryPlaceholderAllocate()
+       assert.Assert(t, alloc != nil, "no placeholder replacement occurred")
+       assert.Equal(t, objects.Replaced, alloc.GetResult())
+       assert.Equal(t, "real-alloc", alloc.GetAllocationKey())
+       assert.Equal(t, "tg-1", alloc.GetTaskGroup())
+       assert.Equal(t, "real-alloc-0", alloc.GetAllocationID())
+
+       // remove the terminated placeholder allocation
+       released, confirmed := partition.removeAllocation(&si.AllocationRelease{
+               ApplicationID:   appID1,
+               TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
+               AllocationKey:   "real-alloc-0",
+               AllocationID:    "placeholder-0",
+       })
+       assert.Assert(t, released == nil, "unexpected released allocation")
+       assert.Assert(t, confirmed != nil, "expected to have a confirmed 
allocation")
+       assert.Equal(t, "real-alloc", confirmed.GetAllocationKey())
+       assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
+       assert.Equal(t, "real-alloc-0", confirmed.GetAllocationID())
+}
diff --git a/pkg/scheduler/tests/mock_rm_callback_test.go 
b/pkg/scheduler/tests/mock_rm_callback_test.go
index f9212a00..a908683d 100644
--- a/pkg/scheduler/tests/mock_rm_callback_test.go
+++ b/pkg/scheduler/tests/mock_rm_callback_test.go
@@ -38,6 +38,8 @@ type mockRMCallback struct {
        rejectedNodes        map[string]bool
        nodeAllocations      map[string][]*si.Allocation
        Allocations          map[string]*si.Allocation
+       releasedPhs          map[string]*si.AllocationRelease
+       appStates            map[string]string
 
        locking.RWMutex
 }
@@ -50,6 +52,8 @@ func newMockRMCallbackHandler() *mockRMCallback {
                rejectedNodes:        make(map[string]bool),
                nodeAllocations:      make(map[string][]*si.Allocation),
                Allocations:          make(map[string]*si.Allocation),
+               releasedPhs:          make(map[string]*si.AllocationRelease),
+               appStates:            make(map[string]string),
        }
 }
 
@@ -63,6 +67,10 @@ func (m *mockRMCallback) UpdateApplication(response 
*si.ApplicationResponse) err
        for _, app := range response.Rejected {
                m.rejectedApplications[app.ApplicationID] = true
                delete(m.acceptedApplications, app.ApplicationID)
+               delete(m.appStates, app.ApplicationID)
+       }
+       for _, app := range response.Updated {
+               m.appStates[app.ApplicationID] = app.State
        }
        return nil
 }
@@ -83,6 +91,9 @@ func (m *mockRMCallback) UpdateAllocation(response 
*si.AllocationResponse) error
        }
        for _, alloc := range response.Released {
                delete(m.Allocations, alloc.AllocationID)
+               if alloc.TerminationType == 
si.TerminationType_PLACEHOLDER_REPLACED {
+                       m.releasedPhs[alloc.AllocationID] = alloc
+               }
        }
        return nil
 }
@@ -132,6 +143,15 @@ func (m *mockRMCallback) waitForRejectedApplication(t 
*testing.T, appID string,
        assert.NilError(t, err, "Failed to wait for rejected application: %s, 
called from: %s", appID, caller())
 }
 
+func (m *mockRMCallback) waitForApplicationState(t *testing.T, appID, state 
string, timeoutMs int) {
+       err := common.WaitFor(10*time.Millisecond, 
time.Duration(timeoutMs)*time.Millisecond, func() bool {
+               m.RLock()
+               defer m.RUnlock()
+               return m.appStates[appID] == state
+       })
+       assert.NilError(t, err, "Failed to wait for application %s state: %s, 
called from: %s", appID, state, caller())
+}
+
 func (m *mockRMCallback) waitForAcceptedNode(t *testing.T, nodeID string, 
timeoutMs int) {
        err := common.WaitFor(10*time.Millisecond, 
time.Duration(timeoutMs)*time.Millisecond, func() bool {
                m.RLock()
@@ -186,3 +206,14 @@ func (m *mockRMCallback) waitForMinAllocations(tb 
testing.TB, nAlloc int, timeou
                tb.Fatalf("Failed to wait for min allocations expected %d, 
actual %d, called from: %s", nAlloc, allocLen, caller())
        }
 }
+
+func (m *mockRMCallback) waitForReleasedPlaceholders(t *testing.T, releases 
int, timeoutMs int) {
+       var releasesLen int
+       err := common.WaitFor(10*time.Millisecond, 
time.Duration(timeoutMs)*time.Millisecond, func() bool {
+               m.RLock()
+               defer m.RUnlock()
+               releasesLen = len(m.releasedPhs)
+               return releasesLen == releases
+       })
+       assert.NilError(t, err, "Failed to wait for placeholder releases, 
expected %d, actual %d, called from: %s", releases, releasesLen, caller())
+}
diff --git a/pkg/scheduler/tests/recovery_test.go 
b/pkg/scheduler/tests/recovery_test.go
index 6a7b8d26..09e0f26e 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -931,3 +931,168 @@ partitions:
        appQueue = part.GetQueue("root.app-1-namespace")
        assert.Assert(t, appQueue != nil, "application queue was not created 
after recovery")
 }
+
+func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen
+       // create an existing allocation
+       existingAllocations := make([]*si.Allocation, 1)
+       existingAllocations[0] = &si.Allocation{
+               AllocationKey: "ph-alloc-1",
+               NodeID:        "node-1:1234",
+               ApplicationID: appID1,
+               TaskGroupName: "tg-1",
+               AllocationID:  "ph-alloc-1-0",
+               ResourcePerAlloc: &si.Resource{
+                       Resources: map[string]*si.Quantity{
+                               "memory": {
+                                       Value: 10,
+                               },
+                               "vcore": {
+                                       Value: 1,
+                               },
+                       },
+               },
+               Placeholder: true,
+       }
+
+       config := `partitions:
+  - name: default
+    queues:
+      - name: root
+        submitacl: "*"
+        queues:
+          - name: default`
+       ms := &mockScheduler{}
+       defer ms.Stop()
+       err := ms.Init(config, true, false)
+       assert.NilError(t, err, "RegisterResourceManager failed")
+
+       // Add application
+       err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
+               New:  newAddAppRequest(map[string]string{appID1: 
"root.default"}),
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "ApplicationRequest failed")
+       ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+
+       // Add node
+       err = ms.proxy.UpdateNode(&si.NodeRequest{
+               Nodes: []*si.NodeInfo{
+                       {
+                               NodeID:     "node-1:1234",
+                               Attributes: map[string]string{},
+                               SchedulableResource: &si.Resource{
+                                       Resources: map[string]*si.Quantity{
+                                               "memory": {Value: 100},
+                                               "vcore":  {Value: 20},
+                                       },
+                               },
+                               Action:              si.NodeInfo_CREATE,
+                               ExistingAllocations: existingAllocations,
+                       },
+               },
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
+       ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+
+       // Add a new placeholder ask with a different task group
+       err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+               Asks: []*si.AllocationAsk{
+                       {
+                               AllocationKey: "ph-alloc-2",
+                               ResourceAsk: &si.Resource{
+                                       Resources: map[string]*si.Quantity{
+                                               "memory": {Value: 10},
+                                               "vcore":  {Value: 1},
+                                       },
+                               },
+                               MaxAllocations: 1,
+                               ApplicationID:  appID1,
+                               TaskGroupName:  "tg-2",
+                               Placeholder:    true,
+                       },
+               },
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "AllocationRequest failed for placeholder ask")
+       ms.mockRM.waitForAllocations(t, 1, 1000)
+
+       // Add two real asks
+       err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+               Asks: []*si.AllocationAsk{
+                       {
+                               AllocationKey: "real-alloc-1",
+                               ResourceAsk: &si.Resource{
+                                       Resources: map[string]*si.Quantity{
+                                               "memory": {Value: 10},
+                                               "vcore":  {Value: 1},
+                                       },
+                               },
+                               MaxAllocations: 1,
+                               ApplicationID:  appID1,
+                               TaskGroupName:  "tg-1",
+                       },
+                       {
+                               AllocationKey: "real-alloc-2",
+                               ResourceAsk: &si.Resource{
+                                       Resources: map[string]*si.Quantity{
+                                               "memory": {Value: 10},
+                                               "vcore":  {Value: 1},
+                                       },
+                               },
+                               MaxAllocations: 1,
+                               ApplicationID:  appID1,
+                               TaskGroupName:  "tg-2",
+                       },
+               },
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "AllocationRequest failed for real asks")
+       ms.mockRM.waitForReleasedPlaceholders(t, 2, 1000)
+
+       // remove placeholder allocations
+       err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+               Releases: &si.AllocationReleasesRequest{
+                       AllocationsToRelease: []*si.AllocationRelease{
+                               {
+                                       ApplicationID:   appID1,
+                                       PartitionName:   "default",
+                                       AllocationID:    "ph-alloc-1-0",
+                                       TerminationType: 
si.TerminationType_PLACEHOLDER_REPLACED,
+                               },
+                               {
+                                       ApplicationID:   appID1,
+                                       PartitionName:   "default",
+                                       AllocationID:    "ph-alloc-2-0",
+                                       TerminationType: 
si.TerminationType_PLACEHOLDER_REPLACED,
+                               },
+                       },
+               },
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "AllocationReleasesRequest failed for 
placeholders")
+
+       // remove real allocations
+       err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+               Releases: &si.AllocationReleasesRequest{
+                       AllocationsToRelease: []*si.AllocationRelease{
+                               {
+                                       ApplicationID:   appID1,
+                                       PartitionName:   "default",
+                                       AllocationID:    "real-alloc-1-0",
+                                       TerminationType: 
si.TerminationType_STOPPED_BY_RM,
+                               },
+                               {
+                                       ApplicationID:   appID1,
+                                       PartitionName:   "default",
+                                       AllocationID:    "real-alloc-2-0",
+                                       TerminationType: 
si.TerminationType_STOPPED_BY_RM,
+                               },
+                       },
+               },
+               RmID: "rm:123",
+       })
+       assert.NilError(t, err, "AllocationReleasesRequest failed for real 
allocations")
+
+       ms.mockRM.waitForApplicationState(t, appID1, "Completing", 1000)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to