This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/branch-1.5 by this push: new 59ce2f7e YUNIKORN-1706 We should clean up failed apps in shim side (#730) 59ce2f7e is described below commit 59ce2f7e1609b6c2a594c09f008a33e458bf89ea Author: qzhu <q...@cloudera.com> AuthorDate: Mon Feb 26 16:36:51 2024 +0100 YUNIKORN-1706 We should clean up failed apps in shim side (#730) Closes: #730 Signed-off-by: Peter Bacsko <pbac...@cloudera.com> (cherry picked from commit aff0677f1e37f355734f1cfbabf8f23594d2504f) --- pkg/cache/application.go | 4 ++++ pkg/cache/application_test.go | 4 ++++ pkg/shim/scheduler.go | 7 +++++++ pkg/shim/scheduler_mock_test.go | 7 +++++++ pkg/shim/scheduler_test.go | 10 +++++----- 5 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/cache/application.go b/pkg/cache/application.go index 0e6385c5..bb5bba2b 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -320,6 +320,10 @@ func (app *Application) getNonTerminatedTaskAlias() []string { return nonTerminatedTaskAlias } +func (app *Application) AreAllTasksTerminated() bool { + return len(app.getNonTerminatedTaskAlias()) == 0 +} + // SetState is only for testing // this is just used for testing, it is not supposed to change state like this func (app *Application) SetState(state string) { diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index e0263c2e..03a572fc 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -532,6 +532,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { // app doesn't have any task res := app.getNonTerminatedTaskAlias() assert.Equal(t, len(res), 0) + assert.Equal(t, app.AreAllTasksTerminated(), true) pod1 := &v1.Pod{ TypeMeta: apis.TypeMeta{ @@ -566,6 +567,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { // res should return both task's alias res = app.getNonTerminatedTaskAlias() assert.Equal(t, len(res), 2) + assert.Equal(t, app.AreAllTasksTerminated(), false) assert.Assert(t, is.Contains(res, "/test-00001")) assert.Assert(t, is.Contains(res, "/test-00002")) @@ -576,12 +578,14 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) { // res should retuen empty res = app.getNonTerminatedTaskAlias() assert.Equal(t, len(res), 0) + assert.Equal(t, app.AreAllTasksTerminated(), true) // set two tasks to one is terminated, another is non-terminated task1.sm.SetState(TaskStates().Rejected) task2.sm.SetState(TaskStates().Allocated) // check the task, should only return task2's alias res = app.getNonTerminatedTaskAlias() + assert.Equal(t, app.AreAllTasksTerminated(), false) assert.Equal(t, len(res), 1) assert.Equal(t, res[0], "/test-00002") } diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index 76a2c717..cb53bb3f 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -158,6 +158,13 @@ func (ss *KubernetesShim) registerShimLayer() error { func (ss *KubernetesShim) schedule() { apps := ss.context.GetAllApplications() for _, app := range apps { + if app.GetApplicationState() == cache.ApplicationStates().Failed { + if app.AreAllTasksTerminated() { + ss.context.RemoveApplicationInternal(app.GetApplicationID()) + } + continue + } + if app.Schedule() { ss.setOutstandingAppsFound(true) } diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index 54c2f75b..51253b1d 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -128,6 +128,13 @@ func (fc *MockScheduler) addNode(nodeName string, nodeLabels map[string]string, return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } +func (fc *MockScheduler) waitForApplicationDeletion(t *testing.T, appID string) { + err := utils.WaitForCondition(func() bool { + return fc.context.GetApplication(appID) == nil + }, time.Second, 5*time.Second) + assert.NilError(t, err, "application has not been deleted") +} + func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expectedState string) { deadline := time.Now().Add(10 * time.Second) for { diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index bbb1d9ad..9651cd0c 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -144,11 +144,11 @@ partitions: // verify app state cluster.waitAndAssertApplicationState(t, appID, cache.ApplicationStates().Failed) - // remove the application - // remove task first or removeApplication will fail - cluster.context.RemoveTask(appID, "task0001") - err = cluster.removeApplication(appID) - assert.Assert(t, err == nil) + // make the task terminal state + cluster.DeletePod(task1) + cluster.waitAndAssertTaskState(t, "app0001", "task0001", cache.TaskStates().Completed) + // make sure the shim side has clean up the failed app + cluster.waitForApplicationDeletion(t, appID) // submit again task1 = createTestPod("root.a", appID, "task0001", taskResource) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org