This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 59ce2f7e YUNIKORN-1706 We should clean up failed apps in shim side 
(#730)
59ce2f7e is described below

commit 59ce2f7e1609b6c2a594c09f008a33e458bf89ea
Author: qzhu <q...@cloudera.com>
AuthorDate: Mon Feb 26 16:36:51 2024 +0100

    YUNIKORN-1706 We should clean up failed apps in shim side (#730)
    
    Closes: #730
    
    Signed-off-by: Peter Bacsko <pbac...@cloudera.com>
    (cherry picked from commit aff0677f1e37f355734f1cfbabf8f23594d2504f)
---
 pkg/cache/application.go        |  4 ++++
 pkg/cache/application_test.go   |  4 ++++
 pkg/shim/scheduler.go           |  7 +++++++
 pkg/shim/scheduler_mock_test.go |  7 +++++++
 pkg/shim/scheduler_test.go      | 10 +++++-----
 5 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 0e6385c5..bb5bba2b 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -320,6 +320,10 @@ func (app *Application) getNonTerminatedTaskAlias() 
[]string {
        return nonTerminatedTaskAlias
 }
 
+func (app *Application) AreAllTasksTerminated() bool {
+       return len(app.getNonTerminatedTaskAlias()) == 0
+}
+
 // SetState is only for testing
 // this is just used for testing, it is not supposed to change state like this
 func (app *Application) SetState(state string) {
diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go
index e0263c2e..03a572fc 100644
--- a/pkg/cache/application_test.go
+++ b/pkg/cache/application_test.go
@@ -532,6 +532,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
        // app doesn't have any task
        res := app.getNonTerminatedTaskAlias()
        assert.Equal(t, len(res), 0)
+       assert.Equal(t, app.AreAllTasksTerminated(), true)
 
        pod1 := &v1.Pod{
                TypeMeta: apis.TypeMeta{
@@ -566,6 +567,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
        // res should return both task's alias
        res = app.getNonTerminatedTaskAlias()
        assert.Equal(t, len(res), 2)
+       assert.Equal(t, app.AreAllTasksTerminated(), false)
        assert.Assert(t, is.Contains(res, "/test-00001"))
        assert.Assert(t, is.Contains(res, "/test-00002"))
 
@@ -576,12 +578,14 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
        // res should retuen empty
        res = app.getNonTerminatedTaskAlias()
        assert.Equal(t, len(res), 0)
+       assert.Equal(t, app.AreAllTasksTerminated(), true)
 
        // set two tasks to one is terminated, another is non-terminated
        task1.sm.SetState(TaskStates().Rejected)
        task2.sm.SetState(TaskStates().Allocated)
        // check the task, should only return task2's alias
        res = app.getNonTerminatedTaskAlias()
+       assert.Equal(t, app.AreAllTasksTerminated(), false)
        assert.Equal(t, len(res), 1)
        assert.Equal(t, res[0], "/test-00002")
 }
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index 76a2c717..cb53bb3f 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -158,6 +158,13 @@ func (ss *KubernetesShim) registerShimLayer() error {
 func (ss *KubernetesShim) schedule() {
        apps := ss.context.GetAllApplications()
        for _, app := range apps {
+               if app.GetApplicationState() == 
cache.ApplicationStates().Failed {
+                       if app.AreAllTasksTerminated() {
+                               
ss.context.RemoveApplicationInternal(app.GetApplicationID())
+                       }
+                       continue
+               }
+
                if app.Schedule() {
                        ss.setOutstandingAppsFound(true)
                }
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 54c2f75b..51253b1d 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -128,6 +128,13 @@ func (fc *MockScheduler) addNode(nodeName string, 
nodeLabels map[string]string,
        return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
 }
 
+func (fc *MockScheduler) waitForApplicationDeletion(t *testing.T, appID 
string) {
+       err := utils.WaitForCondition(func() bool {
+               return fc.context.GetApplication(appID) == nil
+       }, time.Second, 5*time.Second)
+       assert.NilError(t, err, "application has not been deleted")
+}
+
 func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, 
expectedState string) {
        deadline := time.Now().Add(10 * time.Second)
        for {
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index bbb1d9ad..9651cd0c 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -144,11 +144,11 @@ partitions:
        // verify app state
        cluster.waitAndAssertApplicationState(t, appID, 
cache.ApplicationStates().Failed)
 
-       // remove the application
-       // remove task first or removeApplication will fail
-       cluster.context.RemoveTask(appID, "task0001")
-       err = cluster.removeApplication(appID)
-       assert.Assert(t, err == nil)
+       // make the task terminal state
+       cluster.DeletePod(task1)
+       cluster.waitAndAssertTaskState(t, "app0001", "task0001", 
cache.TaskStates().Completed)
+       // make sure the shim side has clean up the failed app
+       cluster.waitForApplicationDeletion(t, appID)
 
        // submit again
        task1 = createTestPod("root.a", appID, "task0001", taskResource)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to