This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push: new 982bc48f [YUNIKORN-2562] Nil pointer panic in Application.ReplaceAllocation() (#846) 982bc48f is described below commit 982bc48f890bc786e2edb450b67f6952e68a98f3 Author: Peter Bacsko <pbac...@cloudera.com> AuthorDate: Mon Apr 22 11:19:40 2024 +0200 [YUNIKORN-2562] Nil pointer panic in Application.ReplaceAllocation() (#846) Closes: #846 Signed-off-by: Peter Bacsko <pbac...@cloudera.com> --- pkg/scheduler/objects/application.go | 35 +++--- pkg/scheduler/objects/application_test.go | 13 +++ pkg/scheduler/partition_test.go | 66 +++++++++++ pkg/scheduler/tests/mock_rm_callback_test.go | 31 +++++ pkg/scheduler/tests/recovery_test.go | 165 +++++++++++++++++++++++++++ 5 files changed, 291 insertions(+), 19 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 74f7c5dc..e6940680 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -638,25 +638,13 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error { zap.Error(err)) } } - sa.requests[ask.GetAllocationKey()] = ask - - // update app priority - repeat := ask.GetPendingAskRepeat() - priority := ask.GetPriority() - if repeat > 0 && priority > sa.askMaxPriority { - sa.askMaxPriority = priority - sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority) - } + sa.addAllocationAskInternal(ask) // Update total pending resource delta.SubFrom(oldAskResource) sa.pending = resources.Add(sa.pending, delta) sa.queue.incPendingResource(delta) - if ask.IsPlaceholder() { - sa.addPlaceholderData(ask) - } - log.Log(log.SchedApplication).Info("ask added successfully to application", zap.String("appID", sa.ApplicationID), zap.String("user", sa.user.User), @@ -678,6 +666,19 @@ func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) { if ask == nil { return } + + sa.addAllocationAskInternal(ask) + + // progress the application from New to Accepted. + if sa.IsNew() { + if err := sa.HandleApplicationEvent(RunApplication); err != nil { + log.Log(log.SchedApplication).Debug("Application state change failed while recovering allocation ask", + zap.Error(err)) + } + } +} + +func (sa *Application) addAllocationAskInternal(ask *AllocationAsk) { sa.requests[ask.GetAllocationKey()] = ask // update app priority @@ -688,12 +689,8 @@ func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) { sa.queue.UpdateApplicationPriority(sa.ApplicationID, sa.askMaxPriority) } - // progress the application from New to Accepted. - if sa.IsNew() { - if err := sa.HandleApplicationEvent(RunApplication); err != nil { - log.Log(log.SchedApplication).Debug("Application state change failed while recovering allocation ask", - zap.Error(err)) - } + if ask.IsPlaceholder() { + sa.addPlaceholderData(ask) } } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index ad0dc58c..2300d729 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -524,6 +524,19 @@ func TestRecoverAllocAsk(t *testing.T) { assert.Equal(t, len(app.requests), 2, "ask should have been added, total should be 2") assert.Assert(t, app.IsAccepted(), "Application should have stayed in accepted state") assertUserGroupResource(t, getTestUserGroup(), nil) + + assert.Equal(t, 0, len(app.placeholderData)) + ask = newAllocationAskTG("ask-3", appID1, "testGroup", res, 1) + app.RecoverAllocationAsk(ask) + phData := app.placeholderData + assert.Equal(t, 1, len(phData)) + taskGroupData := phData["testGroup"] + assert.Assert(t, taskGroupData != nil) + assert.Equal(t, "testGroup", taskGroupData.TaskGroupName) + assert.Equal(t, int64(1), taskGroupData.Count) + assert.Equal(t, int64(0), taskGroupData.Replaced) + assert.Equal(t, int64(0), taskGroupData.TimedOut) + assert.Assert(t, resources.Equals(taskGroupData.MinResource, res)) } // test reservations removal by allocation diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 3baa40cc..7b4d0583 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4423,3 +4423,69 @@ func TestCalculateOutstandingRequests(t *testing.T) { assert.Equal(t, 3, len(requests)) assert.Assert(t, resources.Equals(expectedTotal, total), "total resource expected: %v, got: %v", expectedTotal, total) } + +func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { + // verify the following (YUNIKORN-2562): + // 1. Have a recovered, existing PH allocation (ph-1) from a node with task group "tg-1" + // 2. Have a new PH ask (ph-2) with task group "tg-2" + // 3. Have a real ask with task group "tg-1" + // 4. EXPECTED: successful allocation for the pending ask (ph-2) + // 5. EXPECTED: successful placeholder allocation (replacement) + // 6. EXPECTED: successful removal of ph-1 allocation + setupUGM() + partition, err := newBasePartition() + assert.NilError(t, err, "partition create failed") + + // add a new app + app := newApplication(appID1, "default", defQueue) + err = partition.AddApplication(app) + assert.NilError(t, err, "add application to partition should not have failed") + + // add a node with allocation + nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) + node1 := newNodeMaxResource(nodeID1, nodeRes) + appRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes, true) + ph := objects.NewAllocation(nodeID1, phAsk) + allocs := []*objects.Allocation{ph} + err = partition.AddNode(node1, allocs) + assert.NilError(t, err) + + // add a placeholder ask with a different taskgroup + phAsk2 := newAllocationAskTG("placeholder2", appID1, "tg-2", appRes, true) + err = app.AddAllocationAsk(phAsk2) + assert.NilError(t, err, "failed to add placeholder ask") + + realAsk := newAllocationAskTG("real-alloc", appID1, taskGroup, appRes, false) + err = app.AddAllocationAsk(realAsk) + assert.NilError(t, err, "failed to add real ask") + + // get an allocation for "placeholder2" + alloc := partition.tryAllocate() + assert.Assert(t, alloc != nil, "no allocation occurred") + assert.Equal(t, objects.Allocated, alloc.GetResult()) + assert.Equal(t, "placeholder2", alloc.GetAllocationKey()) + assert.Equal(t, "tg-2", alloc.GetTaskGroup()) + assert.Equal(t, "node-1", alloc.GetNodeID()) + + // real allocation gets replaced + alloc = partition.tryPlaceholderAllocate() + assert.Assert(t, alloc != nil, "no placeholder replacement occurred") + assert.Equal(t, objects.Replaced, alloc.GetResult()) + assert.Equal(t, "real-alloc", alloc.GetAllocationKey()) + assert.Equal(t, "tg-1", alloc.GetTaskGroup()) + assert.Equal(t, "real-alloc-0", alloc.GetAllocationID()) + + // remove the terminated placeholder allocation + released, confirmed := partition.removeAllocation(&si.AllocationRelease{ + ApplicationID: appID1, + TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, + AllocationKey: "real-alloc-0", + AllocationID: "placeholder-0", + }) + assert.Assert(t, released == nil, "unexpected released allocation") + assert.Assert(t, confirmed != nil, "expected to have a confirmed allocation") + assert.Equal(t, "real-alloc", confirmed.GetAllocationKey()) + assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) + assert.Equal(t, "real-alloc-0", confirmed.GetAllocationID()) +} diff --git a/pkg/scheduler/tests/mock_rm_callback_test.go b/pkg/scheduler/tests/mock_rm_callback_test.go index f9212a00..a908683d 100644 --- a/pkg/scheduler/tests/mock_rm_callback_test.go +++ b/pkg/scheduler/tests/mock_rm_callback_test.go @@ -38,6 +38,8 @@ type mockRMCallback struct { rejectedNodes map[string]bool nodeAllocations map[string][]*si.Allocation Allocations map[string]*si.Allocation + releasedPhs map[string]*si.AllocationRelease + appStates map[string]string locking.RWMutex } @@ -50,6 +52,8 @@ func newMockRMCallbackHandler() *mockRMCallback { rejectedNodes: make(map[string]bool), nodeAllocations: make(map[string][]*si.Allocation), Allocations: make(map[string]*si.Allocation), + releasedPhs: make(map[string]*si.AllocationRelease), + appStates: make(map[string]string), } } @@ -63,6 +67,10 @@ func (m *mockRMCallback) UpdateApplication(response *si.ApplicationResponse) err for _, app := range response.Rejected { m.rejectedApplications[app.ApplicationID] = true delete(m.acceptedApplications, app.ApplicationID) + delete(m.appStates, app.ApplicationID) + } + for _, app := range response.Updated { + m.appStates[app.ApplicationID] = app.State } return nil } @@ -83,6 +91,9 @@ func (m *mockRMCallback) UpdateAllocation(response *si.AllocationResponse) error } for _, alloc := range response.Released { delete(m.Allocations, alloc.AllocationID) + if alloc.TerminationType == si.TerminationType_PLACEHOLDER_REPLACED { + m.releasedPhs[alloc.AllocationID] = alloc + } } return nil } @@ -132,6 +143,15 @@ func (m *mockRMCallback) waitForRejectedApplication(t *testing.T, appID string, assert.NilError(t, err, "Failed to wait for rejected application: %s, called from: %s", appID, caller()) } +func (m *mockRMCallback) waitForApplicationState(t *testing.T, appID, state string, timeoutMs int) { + err := common.WaitFor(10*time.Millisecond, time.Duration(timeoutMs)*time.Millisecond, func() bool { + m.RLock() + defer m.RUnlock() + return m.appStates[appID] == state + }) + assert.NilError(t, err, "Failed to wait for application %s state: %s, called from: %s", appID, state, caller()) +} + func (m *mockRMCallback) waitForAcceptedNode(t *testing.T, nodeID string, timeoutMs int) { err := common.WaitFor(10*time.Millisecond, time.Duration(timeoutMs)*time.Millisecond, func() bool { m.RLock() @@ -186,3 +206,14 @@ func (m *mockRMCallback) waitForMinAllocations(tb testing.TB, nAlloc int, timeou tb.Fatalf("Failed to wait for min allocations expected %d, actual %d, called from: %s", nAlloc, allocLen, caller()) } } + +func (m *mockRMCallback) waitForReleasedPlaceholders(t *testing.T, releases int, timeoutMs int) { + var releasesLen int + err := common.WaitFor(10*time.Millisecond, time.Duration(timeoutMs)*time.Millisecond, func() bool { + m.RLock() + defer m.RUnlock() + releasesLen = len(m.releasedPhs) + return releasesLen == releases + }) + assert.NilError(t, err, "Failed to wait for placeholder releases, expected %d, actual %d, called from: %s", releases, releasesLen, caller()) +} diff --git a/pkg/scheduler/tests/recovery_test.go b/pkg/scheduler/tests/recovery_test.go index 6a7b8d26..09e0f26e 100644 --- a/pkg/scheduler/tests/recovery_test.go +++ b/pkg/scheduler/tests/recovery_test.go @@ -931,3 +931,168 @@ partitions: appQueue = part.GetQueue("root.app-1-namespace") assert.Assert(t, appQueue != nil, "application queue was not created after recovery") } + +func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen + // create an existing allocation + existingAllocations := make([]*si.Allocation, 1) + existingAllocations[0] = &si.Allocation{ + AllocationKey: "ph-alloc-1", + NodeID: "node-1:1234", + ApplicationID: appID1, + TaskGroupName: "tg-1", + AllocationID: "ph-alloc-1-0", + ResourcePerAlloc: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": { + Value: 10, + }, + "vcore": { + Value: 1, + }, + }, + }, + Placeholder: true, + } + + config := `partitions: + - name: default + queues: + - name: root + submitacl: "*" + queues: + - name: default` + ms := &mockScheduler{} + defer ms.Stop() + err := ms.Init(config, true, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + // Add application + err = ms.proxy.UpdateApplication(&si.ApplicationRequest{ + New: newAddAppRequest(map[string]string{appID1: "root.default"}), + RmID: "rm:123", + }) + assert.NilError(t, err, "ApplicationRequest failed") + ms.mockRM.waitForAcceptedApplication(t, appID1, 1000) + + // Add node + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 100}, + "vcore": {Value: 20}, + }, + }, + Action: si.NodeInfo_CREATE, + ExistingAllocations: existingAllocations, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest nodes and app for recovery failed") + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + + // Add a new placeholder ask with a different task group + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: []*si.AllocationAsk{ + { + AllocationKey: "ph-alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + MaxAllocations: 1, + ApplicationID: appID1, + TaskGroupName: "tg-2", + Placeholder: true, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "AllocationRequest failed for placeholder ask") + ms.mockRM.waitForAllocations(t, 1, 1000) + + // Add two real asks + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Asks: []*si.AllocationAsk{ + { + AllocationKey: "real-alloc-1", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + MaxAllocations: 1, + ApplicationID: appID1, + TaskGroupName: "tg-1", + }, + { + AllocationKey: "real-alloc-2", + ResourceAsk: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 10}, + "vcore": {Value: 1}, + }, + }, + MaxAllocations: 1, + ApplicationID: appID1, + TaskGroupName: "tg-2", + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "AllocationRequest failed for real asks") + ms.mockRM.waitForReleasedPlaceholders(t, 2, 1000) + + // remove placeholder allocations + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Releases: &si.AllocationReleasesRequest{ + AllocationsToRelease: []*si.AllocationRelease{ + { + ApplicationID: appID1, + PartitionName: "default", + AllocationID: "ph-alloc-1-0", + TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, + }, + { + ApplicationID: appID1, + PartitionName: "default", + AllocationID: "ph-alloc-2-0", + TerminationType: si.TerminationType_PLACEHOLDER_REPLACED, + }, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "AllocationReleasesRequest failed for placeholders") + + // remove real allocations + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Releases: &si.AllocationReleasesRequest{ + AllocationsToRelease: []*si.AllocationRelease{ + { + ApplicationID: appID1, + PartitionName: "default", + AllocationID: "real-alloc-1-0", + TerminationType: si.TerminationType_STOPPED_BY_RM, + }, + { + ApplicationID: appID1, + PartitionName: "default", + AllocationID: "real-alloc-2-0", + TerminationType: si.TerminationType_STOPPED_BY_RM, + }, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "AllocationReleasesRequest failed for real allocations") + + ms.mockRM.waitForApplicationState(t, appID1, "Completing", 1000) +} --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org