This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push: new a1b8480d [YUNIKORN-2588] Shim: Replace AllocationID with AllocationKey (#828) a1b8480d is described below commit a1b8480d9a2b5580bbfdbfccbc2631da343937c5 Author: Craig Condit <ccon...@apache.org> AuthorDate: Sat Apr 27 06:27:16 2024 -0500 [YUNIKORN-2588] Shim: Replace AllocationID with AllocationKey (#828) The scheduler interface has been updated to use AllocationKey instead of AllocationID (as these are now equivalent). Update references to reflect this change. Closes: #828 --- go.mod | 4 +- go.sum | 8 +-- pkg/cache/application.go | 6 +-- pkg/cache/application_state.go | 12 ++--- pkg/cache/application_state_test.go | 60 +++++++++++----------- pkg/cache/application_test.go | 29 ++++++----- pkg/cache/context.go | 1 - pkg/cache/context_test.go | 23 ++++----- pkg/cache/scheduler_callback.go | 11 ++-- pkg/cache/task.go | 36 ++++++------- pkg/cache/task_state.go | 14 ++--- pkg/cache/task_test.go | 28 +++++----- pkg/cache/utils_test.go | 5 +- pkg/common/si_helper.go | 7 ++- pkg/common/si_helper_test.go | 8 +-- pkg/common/test/recoverable_apps_mock.go | 3 +- test/e2e/basic_scheduling/basic_scheduling_test.go | 1 - .../recovery_and_restart_test.go | 1 - 18 files changed, 124 insertions(+), 133 deletions(-) diff --git a/go.mod b/go.mod index 0d16d4c4..b4de07f0 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim go 1.21 require ( - github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8 - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a + github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/looplab/fsm v1.0.1 diff --git a/go.sum b/go.sum index 5f983d0f..2efcb230 100644 --- a/go.sum +++ b/go.sum @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8 h1:xPcUd/tDOrIatdgyzCgUtRlvfM4s/DN4cS49iyw3rnU= -github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8/go.mod h1:ZXkFNHrLLReWAcEGj6Ya3hkmr5lMpa9WgIy4Lx0dlxw= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a h1:H978zsTL2FvbRFnySO83DOFLO33PwHWFdmHvMoSVXsc= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240423191701-8c98b1604a7a/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= +github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef h1:m/wkG8mJqJ/eAihWR/g5IVfkHl79ve5geXXKM0U/slE= +github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef/go.mod h1:BaNNx6FksvVu/8tfo7qNQvVhAsD284ybdJnLoFIexgA= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1 h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240425182941-07f5695119a1/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= diff --git a/pkg/cache/application.go b/pkg/cache/application.go index f43d97f4..3420cc43 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -598,14 +598,14 @@ func (app *Application) handleFailApplicationEvent(errMsg string) { } } -func (app *Application) handleReleaseAppAllocationEvent(allocationID string, terminationType string) { +func (app *Application) handleReleaseAppAllocationEvent(allocationKey string, terminationType string) { log.Log(log.ShimCacheApplication).Info("try to release pod from application", zap.String("appID", app.applicationID), - zap.String("allocationID", allocationID), + zap.String("allocationKey", allocationKey), zap.String("terminationType", terminationType)) for _, task := range app.taskMap { - if task.allocationID == allocationID { + if task.allocationKey == allocationKey { task.setTaskTerminationType(terminationType) err := task.DeleteTaskPod() if err != nil { diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go index cb629394..b64f9486 100644 --- a/pkg/cache/application_state.go +++ b/pkg/cache/application_state.go @@ -267,15 +267,15 @@ func (ue UpdateApplicationReservationEvent) GetApplicationID() string { // ------------------------ type ReleaseAppAllocationEvent struct { applicationID string - allocationID string + allocationKey string terminationType string event ApplicationEventType } -func NewReleaseAppAllocationEvent(appID string, allocTermination si.TerminationType, allocationID string) ReleaseAppAllocationEvent { +func NewReleaseAppAllocationEvent(appID string, allocTermination si.TerminationType, allocationKey string) ReleaseAppAllocationEvent { return ReleaseAppAllocationEvent{ applicationID: appID, - allocationID: allocationID, + allocationKey: allocationKey, terminationType: si.TerminationType_name[int32(allocTermination)], event: ReleaseAppAllocation, } @@ -287,7 +287,7 @@ func (re ReleaseAppAllocationEvent) GetApplicationID() string { func (re ReleaseAppAllocationEvent) GetArgs() []interface{} { args := make([]interface{}, 2) - args[0] = re.allocationID + args[0] = re.allocationKey args[1] = re.terminationType return args } @@ -546,9 +546,9 @@ func newAppState() *fsm.FSM { //nolint:funlen log.Log(log.ShimFSM).Error("fail to parse event arg", zap.Error(err)) return } - allocationID := eventArgs[0] + allocationKey := eventArgs[0] terminationType := eventArgs[1] - app.handleReleaseAppAllocationEvent(allocationID, terminationType) + app.handleReleaseAppAllocationEvent(allocationKey, terminationType) }, ReleaseAppAllocationAsk.String(): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck diff --git a/pkg/cache/application_state_test.go b/pkg/cache/application_state_test.go index c2f94e93..1247a488 100644 --- a/pkg/cache/application_state_test.go +++ b/pkg/cache/application_state_test.go @@ -671,22 +671,22 @@ func TestUpdateApplicationReservationEventGetApplicationID(t *testing.T) { func TestNewReleaseAppAllocationEvent(t *testing.T) { tests := []struct { - name string - appID, allocationID string - terminationType si.TerminationType - wantID, wantAllocationID, wantType string - wantEvent ApplicationEventType + name string + appID, allocationKey string + terminationType si.TerminationType + wantID, wantAllocationKey, wantType string + wantEvent ApplicationEventType }{ - {TestCreateName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, "testAppId001", "testAllocationID001", "TIMEOUT", ReleaseAppAllocation}, + {TestCreateName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT", ReleaseAppAllocation}, } for _, tt := range tests { - instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID) + instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationKey) t.Run(tt.name, func(t *testing.T) { - if instance.applicationID != tt.wantID || instance.allocationID != tt.wantAllocationID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent { + if instance.applicationID != tt.wantID || instance.allocationKey != tt.wantAllocationKey || instance.terminationType != tt.wantType || instance.event != tt.wantEvent { t.Errorf("want %s %s %s %s, got %s %s %s %s", - tt.wantID, tt.wantAllocationID, tt.wantType, tt.wantEvent, - instance.applicationID, instance.allocationID, instance.terminationType, instance.event) + tt.wantID, tt.wantAllocationKey, tt.wantType, tt.wantEvent, + instance.applicationID, instance.allocationKey, instance.terminationType, instance.event) } }) } @@ -694,16 +694,16 @@ func TestNewReleaseAppAllocationEvent(t *testing.T) { func TestReleaseAppAllocationEventGetEvent(t *testing.T) { tests := []struct { - name string - appID, allocationID string - terminationType si.TerminationType - wantEvent ApplicationEventType + name string + appID, allocationKey string + terminationType si.TerminationType + wantEvent ApplicationEventType }{ - {TestEventName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, ReleaseAppAllocation}, + {TestEventName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, ReleaseAppAllocation}, } for _, tt := range tests { - instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID) + instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationKey) event := instance.GetEvent() t.Run(tt.name, func(t *testing.T) { if event != tt.wantEvent.String() { @@ -715,18 +715,18 @@ func TestReleaseAppAllocationEventGetEvent(t *testing.T) { func TestReleaseAppAllocationEventGetArgs(t *testing.T) { tests := []struct { - name string - appID, allocationID string - terminationType si.TerminationType - wantLen int - castOk []bool - wantArg []string + name string + appID, allocationKey string + terminationType si.TerminationType + wantLen int + castOk []bool + wantArg []string }{ - {TestArgsName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, 2, []bool{true, true}, []string{"testAllocationID001", "TIMEOUT"}}, + {TestArgsName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, 2, []bool{true, true}, []string{"testTaskId001", "TIMEOUT"}}, } for _, tt := range tests { - instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID) + instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationKey) args := instance.GetArgs() t.Run(tt.name, func(t *testing.T) { if len(args) != tt.wantLen { @@ -748,16 +748,16 @@ func TestReleaseAppAllocationEventGetArgs(t *testing.T) { func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) { tests := []struct { - name string - appID, allocationID string - terminationType si.TerminationType - wantID string + name string + appID, allocationKey string + terminationType si.TerminationType + wantID string }{ - {TestAppIDName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, "testAppId001"}, + {TestAppIDName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001"}, } for _, tt := range tests { - instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID) + instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationKey) appID := instance.GetApplicationID() t.Run(tt.name, func(t *testing.T) { if appID != tt.wantID { diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index 874a6e14..56c9e818 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -442,9 +442,9 @@ func TestReleaseAppAllocation(t *testing.T) { app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, ms) task := NewTask("task01", app, context, pod) app.addTask(task) - task.allocationID = taskAllocationID + task.allocationKey = task.taskID // app must be running states - err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID)) + err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, task.taskID)) if err == nil { // this should give an error t.Error("expecting error got 'nil'") @@ -452,7 +452,7 @@ func TestReleaseAppAllocation(t *testing.T) { // set app states to running, let event can be trigger app.SetState(ApplicationStates().Running) assertAppState(t, app, ApplicationStates().Running, 3*time.Second) - err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID)) + err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, task.taskID)) assert.NilError(t, err) // after handle release event the states of app must be running assertAppState(t, app, ApplicationStates().Running, 3*time.Second) @@ -823,7 +823,7 @@ func TestTryReservePostRestart(t *testing.T) { Containers: containers, }, }) - task0.allocationID = string(task0.pod.UID) + task0.allocationKey = string(task0.pod.UID) task0.nodeName = "fake-host" task0.sm.SetState(TaskStates().Allocated) @@ -1002,9 +1002,10 @@ func TestReleaseAppAllocationInFailingState(t *testing.T) { app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, ms) task := NewTask("task01", app, context, pod) app.addTask(task) - task.allocationID = taskAllocationID + task.allocationKey = task.taskID + // app must be running states - err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID)) + err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, task.taskID)) if err == nil { // this should give an error t.Error("expecting error got 'nil'") @@ -1012,12 +1013,12 @@ func TestReleaseAppAllocationInFailingState(t *testing.T) { // set app states to running, let event can be trigger app.SetState(ApplicationStates().Running) assertAppState(t, app, ApplicationStates().Running, 3*time.Second) - err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID)) + err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, task.taskID)) assert.NilError(t, err) // after handle release event the states of app must be running assertAppState(t, app, ApplicationStates().Running, 3*time.Second) app.SetState(ApplicationStates().Failing) - err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID)) + err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, task.taskID)) assert.NilError(t, err) // after handle release event the states of app must be failing assertAppState(t, app, ApplicationStates().Failing, 3*time.Second) @@ -1057,7 +1058,7 @@ func TestResumingStateTransitions(t *testing.T) { // Add tasks app.addTask(task1) app.addTask(task2) - task1.allocationID = taskAllocationID + task1.allocationKey = task1.taskID context.addApplicationToContext(app) // Set app state to "reserving" @@ -1127,7 +1128,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "pod00001", Namespace: "default", - UID: "UID-POD-00001", + UID: "task01", Labels: map[string]string{ "queue": "root.a", "applicationId": "app00001", @@ -1150,7 +1151,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) { ObjectMeta: apis.ObjectMeta{ Name: "pod00002", Namespace: "default", - UID: "UID-POD-00002", + UID: "task02", Labels: map[string]string{ "queue": "root.a", "applicationId": "app00001", @@ -1169,7 +1170,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) { assert.Equal(t, len(app.GetNewTasks()), 1) appID := "app00001" - allocationID := "UID-POD-00002" + allocationKey := "task02" task1 := context.AddTask(&AddTaskRequest{ Metadata: TaskMetadata{ @@ -1185,12 +1186,12 @@ func TestPlaceholderTimeoutEvents(t *testing.T) { _, taskErr := app.GetTask("task02") assert.NilError(t, taskErr, "Task should exist") - task1.allocationID = allocationID + task1.allocationKey = allocationKey // set app states to running, let event can be trigger app.SetState(ApplicationStates().Running) assertAppState(t, app, ApplicationStates().Running, 3*time.Second) - err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, allocationID)) + err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, allocationKey)) assert.NilError(t, err) // after handle release event the states of app must be running assertAppState(t, app, ApplicationStates().Running, 3*time.Second) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 6c9f34e2..301f1096 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -1737,7 +1737,6 @@ func getExistingAllocation(pod *v1.Pod) *si.Allocation { return &si.Allocation{ AllocationKey: string(pod.UID), AllocationTags: meta.Tags, - AllocationID: string(pod.UID), ResourcePerAlloc: common.GetPodResource(pod), NodeID: pod.Spec.NodeName, ApplicationID: meta.ApplicationID, diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 33e4f10f..103c9369 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -870,7 +870,7 @@ func TestRecoverTask(t *testing.T) { applicationID: alloc.ApplicationID, taskID: alloc.AllocationKey, nodeID: alloc.NodeID, - allocationID: alloc.AllocationID, + allocationKey: alloc.AllocationKey, event: TaskAllocated, }) } @@ -922,7 +922,7 @@ func TestRecoverTask(t *testing.T) { err := utils.WaitForCondition(func() bool { return task.GetTaskState() == TaskStates().Bound }, 100*time.Millisecond, 3*time.Second) - assert.NilError(t, err, "failed to wait for allocation allocationID being set for task") + assert.NilError(t, err, "failed to wait for allocation allocationKey being set for task") // add a tasks to the existing application // this task was already completed with state: Succeed @@ -971,11 +971,11 @@ func TestRecoverTask(t *testing.T) { assert.Equal(t, len(app.getTasks(TaskStates().New)), 1) taskInfoVerifiers := []struct { - taskID string - expectedState string - expectedAllocationID string - expectedPodName string - expectedNodeName string + taskID string + expectedState string + expectedAllocationKey string + expectedPodName string + expectedNodeName string }{ {taskUID1, TaskStates().Bound, taskUID1, "pod1", fakeNodeName}, {taskUID2, TaskStates().Completed, taskUID2, "pod2", fakeNodeName}, @@ -989,7 +989,7 @@ func TestRecoverTask(t *testing.T) { rt, err := app.GetTask(tt.taskID) assert.NilError(t, err) assert.Equal(t, rt.GetTaskState(), tt.expectedState) - assert.Equal(t, rt.allocationID, tt.expectedAllocationID) + assert.Equal(t, rt.allocationKey, tt.expectedAllocationKey) assert.Equal(t, rt.pod.Name, tt.expectedPodName) assert.Equal(t, rt.alias, fmt.Sprintf("%s/%s", podNamespace, tt.expectedPodName)) }) @@ -1011,7 +1011,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { applicationID: alloc.ApplicationID, taskID: alloc.AllocationKey, nodeID: alloc.NodeID, - allocationID: alloc.AllocationID, + allocationKey: alloc.AllocationKey, event: TaskAllocated, }) } @@ -1061,7 +1061,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { err := utils.WaitForCondition(func() bool { return task0.GetTaskState() == TaskStates().Bound }, 100*time.Millisecond, 3*time.Second) - assert.NilError(t, err, "failed to wait for allocation allocationID being set for task0") + assert.NilError(t, err, "failed to wait for allocation allocationKey being set for task0") task1 := context.AddTask(&AddTaskRequest{ Metadata: TaskMetadata{ @@ -1080,7 +1080,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) { err = utils.WaitForCondition(func() bool { return task1.GetTaskState() == TaskStates().Bound }, 100*time.Millisecond, 3*time.Second) - assert.NilError(t, err, "failed to wait for allocation allocationID being set for task1") + assert.NilError(t, err, "failed to wait for allocation allocationKey being set for task1") // app should have 2 tasks recovered app, exist := context.applications[appID] @@ -1955,7 +1955,6 @@ func TestGetExistingAllocation(t *testing.T) { alloc := getExistingAllocation(pod) assert.Equal(t, alloc.ApplicationID, "app00001") assert.Equal(t, alloc.AllocationKey, string(pod.UID)) - assert.Equal(t, alloc.AllocationID, string(pod.UID)) assert.Equal(t, alloc.NodeID, "allocated-node") } diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go index 81163572..e7ce30dc 100644 --- a/pkg/cache/scheduler_callback.go +++ b/pkg/cache/scheduler_callback.go @@ -51,14 +51,13 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons // got allocation for pod, bind pod to the scheduled node log.Log(log.ShimRMCallback).Debug("callback: response to new allocation", zap.String("allocationKey", alloc.AllocationKey), - zap.String("allocationID", alloc.AllocationID), zap.String("applicationID", alloc.ApplicationID), zap.String("nodeID", alloc.NodeID)) // update cache task := callback.context.getTask(alloc.ApplicationID, alloc.AllocationKey) if task != nil { - task.setAllocationID(alloc.AllocationID) + task.setAllocationKey(alloc.AllocationKey) } else { log.Log(log.ShimRMCallback).Warn("Unable to get task", zap.String("taskID", alloc.AllocationKey)) } @@ -72,9 +71,9 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons if task != nil { if utils.IsAssignedPod(task.GetTaskPod()) { // task is already bound, fixup state and continue - task.MarkPreviouslyAllocated(alloc.AllocationID, alloc.NodeID) + task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID) } else { - ev := NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.AllocationID, alloc.NodeID) + ev := NewAllocateTaskEvent(app.GetApplicationID(), task.taskID, alloc.AllocationKey, alloc.NodeID) dispatcher.Dispatch(ev) } } @@ -105,7 +104,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons for _, release := range response.Released { log.Log(log.ShimRMCallback).Debug("callback: response to released allocations", - zap.String("AllocationID", release.AllocationID)) + zap.String("AllocationKey", release.AllocationKey)) // update cache callback.context.ForgetPod(release.GetAllocationKey()) @@ -113,7 +112,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons // TerminationType 0 mean STOPPED_BY_RM if release.TerminationType != si.TerminationType_STOPPED_BY_RM { // send release app allocation to application states machine - ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.AllocationID) + ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.AllocationKey) dispatcher.Dispatch(ev) } } diff --git a/pkg/cache/task.go b/pkg/cache/task.go index bb54f4c0..05b1b889 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -44,7 +44,7 @@ type Task struct { alias string applicationID string application *Application - allocationID string + allocationKey string resource *si.Resource pod *v1.Pod podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons @@ -169,10 +169,10 @@ func (task *Task) getTaskGroupName() string { return task.taskGroupName } -func (task *Task) getTaskAllocationID() string { +func (task *Task) getNodeName() string { task.lock.RLock() defer task.lock.RUnlock() - return task.allocationID + return task.nodeName } func (task *Task) DeleteTaskPod() error { @@ -210,13 +210,13 @@ func (task *Task) initialize() { // the resources were already released, instead of starting // from New, directly set the task to Completed if utils.IsPodTerminated(task.pod) { - task.allocationID = string(task.pod.UID) + task.allocationKey = string(task.pod.UID) task.nodeName = task.pod.Spec.NodeName task.sm.SetState(TaskStates().Completed) log.Log(log.ShimCacheTask).Info("set task as Completed", zap.String("appID", task.applicationID), zap.String("taskID", task.taskID), - zap.String("allocationID", task.allocationID), + zap.String("allocationKey", task.allocationKey), zap.String("nodeName", task.nodeName)) } } @@ -260,12 +260,12 @@ func (task *Task) SetTaskSchedulingState(state TaskSchedulingState) { task.schedulingState = state } -func (task *Task) MarkPreviouslyAllocated(allocationID string, nodeID string) { +func (task *Task) MarkPreviouslyAllocated(allocationKey string, nodeID string) { task.sm.SetState(TaskStates().Bound) task.lock.Lock() defer task.lock.Unlock() task.schedulingState = TaskSchedAllocated - task.allocationID = allocationID + task.allocationKey = allocationKey task.nodeName = nodeID if task.placeholder { log.Log(log.ShimCacheTask).Info("placeholder is bound", @@ -413,9 +413,9 @@ func (task *Task) postTaskAllocated() { // If we find the task is already in Completed state while handling TaskAllocated // event, we need to explicitly release this allocation because it is no // longer valid. -func (task *Task) beforeTaskAllocated(eventSrc string, allocationID string, nodeID string) { - // task is allocated on a node with a allocationID set the details in the task here to allow referencing later. - task.allocationID = allocationID +func (task *Task) beforeTaskAllocated(eventSrc string, allocationKey string, nodeID string) { + // task is allocated on a node with a allocationKey set the details in the task here to allow referencing later. + task.allocationKey = allocationKey task.nodeName = nodeID // If the task is Completed the pod was deleted on K8s but the core was not aware yet. // Notify the core to release this allocation to avoid resource leak. @@ -423,7 +423,7 @@ func (task *Task) beforeTaskAllocated(eventSrc string, allocationID string, node if eventSrc == TaskStates().Completed { log.Log(log.ShimCacheTask).Info("task is already completed, invalidate the allocation", zap.String("currentTaskState", eventSrc), - zap.String("allocationID", allocationID), + zap.String("allocationKey", allocationKey), zap.String("allocatedNode", nodeID)) task.releaseAllocation() } @@ -505,7 +505,7 @@ func (task *Task) releaseAllocation() { zap.String("applicationID", task.applicationID), zap.String("taskID", task.taskID), zap.String("taskAlias", task.alias), - zap.String("allocationID", task.allocationID), + zap.String("allocationKey", task.allocationKey), zap.String("task", task.GetTaskState()), zap.String("terminationType", task.terminationType)) @@ -516,17 +516,17 @@ func (task *Task) releaseAllocation() { s := TaskStates() switch task.GetTaskState() { case s.New, s.Pending, s.Scheduling, s.Rejected: - releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.allocationID, task.application.partition, task.terminationType) + releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.allocationKey, task.application.partition, task.terminationType) default: - if task.allocationID == "" { - log.Log(log.ShimCacheTask).Warn("BUG: task allocation allocationID is empty on release", + if task.allocationKey == "" { + log.Log(log.ShimCacheTask).Warn("BUG: task allocationKey is empty on release", zap.String("applicationID", task.applicationID), zap.String("taskID", task.taskID), zap.String("taskAlias", task.alias), zap.String("task", task.GetTaskState())) } releaseRequest = common.CreateReleaseRequestForTask( - task.applicationID, task.taskID, task.allocationID, task.application.partition, task.terminationType) + task.applicationID, task.taskID, task.allocationKey, task.application.partition, task.terminationType) } if releaseRequest.Releases != nil { @@ -588,10 +588,10 @@ func (task *Task) UpdatePodCondition(podCondition *v1.PodCondition) (bool, *v1.P return false, pod } -func (task *Task) setAllocationID(allocationID string) { +func (task *Task) setAllocationKey(allocationKey string) { task.lock.Lock() defer task.lock.Unlock() - task.allocationID = allocationID + task.allocationKey = allocationKey } func (task *Task) failWithEvent(errorMessage, actionReason string) { diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go index f0371bd6..d0ee58ef 100644 --- a/pkg/cache/task_state.go +++ b/pkg/cache/task_state.go @@ -127,15 +127,15 @@ type AllocatedTaskEvent struct { taskID string event TaskEventType nodeID string - allocationID string + allocationKey string } -func NewAllocateTaskEvent(appID string, taskID string, allocationID string, nid string) AllocatedTaskEvent { +func NewAllocateTaskEvent(appID string, taskID string, allocationKey string, nid string) AllocatedTaskEvent { return AllocatedTaskEvent{ applicationID: appID, taskID: taskID, event: TaskAllocated, - allocationID: allocationID, + allocationKey: allocationKey, nodeID: nid, } } @@ -146,7 +146,7 @@ func (ae AllocatedTaskEvent) GetEvent() string { func (ae AllocatedTaskEvent) GetArgs() []interface{} { args := make([]interface{}, 2) - args[0] = ae.allocationID + args[0] = ae.allocationKey args[1] = ae.nodeID return args } @@ -419,15 +419,15 @@ func newTaskState() *fsm.FSM { }, beforeHook(TaskAllocated): func(_ context.Context, event *fsm.Event) { task := event.Args[0].(*Task) //nolint:errcheck - // All allocation events must include the allocationID and nodeID passed from the core + // All allocation events must include the allocationKey and nodeID passed from the core eventArgs := make([]string, 2) if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil { log.Log(log.ShimFSM).Error("failed to parse event arg", zap.Error(err)) return } - allocationID := eventArgs[0] + allocationKey := eventArgs[0] nodeID := eventArgs[1] - task.beforeTaskAllocated(event.Src, allocationID, nodeID) + task.beforeTaskAllocated(event.Src, allocationKey, nodeID) }, beforeHook(CompleteTask): func(_ context.Context, event *fsm.Event) { task := event.Args[0].(*Task) //nolint:errcheck diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index c08c4f03..3b11bbc5 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -170,7 +170,7 @@ func TestReleaseTaskAllocation(t *testing.T) { }, ObjectMeta: metav1.ObjectMeta{ Name: "pod-resource-test-00001", - UID: "UID-00001", + UID: "task01", }, Spec: v1.PodSpec{ Containers: containers, @@ -201,9 +201,9 @@ func TestReleaseTaskAllocation(t *testing.T) { assert.Equal(t, task.GetTaskState(), TaskStates().Allocated) // bind a task is a async process, wait for it to happen err = common.WaitFor(100*time.Millisecond, 3*time.Second, func() bool { - return task.getTaskAllocationID() == string(pod.UID) + return task.getNodeName() == "node-1" }) - assert.NilError(t, err, "failed to wait for allocation allocationID being set for task") + assert.NilError(t, err, "failed to wait for allocation allocationKey being set for task") // bound event3 := NewBindTaskEvent(app.applicationID, task.taskID) @@ -218,7 +218,7 @@ func TestReleaseTaskAllocation(t *testing.T) { assert.Assert(t, request.Releases.AllocationsToRelease != nil) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID, "UID-00001") + assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") @@ -235,7 +235,7 @@ func TestReleaseTaskAllocation(t *testing.T) { // 2 updates call, 1 for submit, 1 for release assert.Equal(t, mockedApiProvider.GetSchedulerAPIUpdateAllocationCount(), int32(2)) - // New to Failed, no AllocationID is set (only ask is released) + // New to Failed, no AllocationKey is set (only ask is released) task = NewTask("task01", app, mockedContext, pod) mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { assert.Assert(t, request.Releases != nil) @@ -250,16 +250,16 @@ func TestReleaseTaskAllocation(t *testing.T) { err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test failure")) assert.NilError(t, err, "failed to handle FailTask event") - // Scheduling to Failed, AllocationID is set (ask+allocation are both released) + // Scheduling to Failed, AllocationKey is set (ask+allocation are both released) task = NewTask("task01", app, mockedContext, pod) - task.setAllocationID("alloc-0") + task.setAllocationKey("task01") task.sm.SetState(TaskStates().Scheduling) mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID, "alloc-0") + assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") @@ -618,10 +618,9 @@ func TestHandleSubmitTaskEvent(t *testing.T) { func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { const ( - podUID = "UID-00001" - appID = "app-test-001" - queueName = "root.abc" - allocationID = "allocationid-xyz" + podUID = "UID-00001" + appID = "app-test-001" + queueName = "root.abc" ) mockedContext := initContextForTest() mockedAPIProvider, ok := mockedContext.apiProvider.(*client.MockedAPIProvider) @@ -679,7 +678,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { // can be released from the core to avoid resource leak alloc := &si.Allocation{ AllocationKey: string(pod1.UID), - AllocationID: allocationID, NodeID: "fake-node", ApplicationID: appID, PartitionName: "default", @@ -691,10 +689,10 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { "allocationsToRelease is not in the expected length") allocToRelease := request.Releases.AllocationsToRelease[0] assert.Equal(t, allocToRelease.ApplicationID, alloc.ApplicationID) - assert.Equal(t, allocToRelease.AllocationID, alloc.AllocationID) + assert.Equal(t, allocToRelease.AllocationKey, alloc.AllocationKey) return nil }) - ev1 := NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.AllocationID, alloc.NodeID) + ev1 := NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.AllocationKey, alloc.NodeID) err = task1.handle(ev1) assert.NilError(t, err, "failed to handle AllocateTask event") assert.Equal(t, task1.GetTaskState(), TaskStates().Completed) diff --git a/pkg/cache/utils_test.go b/pkg/cache/utils_test.go index 5c79f8cc..1e527399 100644 --- a/pkg/cache/utils_test.go +++ b/pkg/cache/utils_test.go @@ -30,9 +30,8 @@ import ( ) const ( - appID = "app01" - app2ID = "app02" - taskAllocationID = "ALLOCATIONID01" + appID = "app01" + app2ID = "app02" ) //nolint:funlen diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index b679a844..9c4520a0 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -95,7 +95,6 @@ func CreateAllocationForTask(appID, taskID, nodeID string, resource *si.Resource allocation := si.Allocation{ AllocationKey: taskID, AllocationTags: CreateTagsForTask(pod), - AllocationID: taskID, ResourcePerAlloc: resource, Priority: CreatePriorityForTask(pod), NodeID: nodeID, @@ -122,13 +121,13 @@ func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType return si.TerminationType_STOPPED_BY_RM } -func CreateReleaseRequestForTask(appID, taskID, allocationID, partition, terminationType string) *si.AllocationRequest { +func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition, terminationType string) *si.AllocationRequest { var allocToRelease []*si.AllocationRelease - if allocationID != "" { + if allocationKey != "" { allocToRelease = make([]*si.AllocationRelease, 1) allocToRelease[0] = &si.AllocationRelease{ ApplicationID: appID, - AllocationID: allocationID, + AllocationKey: allocationKey, PartitionName: partition, TerminationType: GetTerminationTypeFromString(terminationType), Message: "task completed", diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 04ac266e..67be6147 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -32,21 +32,21 @@ import ( const nodeID = "node-01" func TestCreateReleaseRequestForTask(t *testing.T) { - // with "allocationID" - request := CreateReleaseRequestForTask("app01", "task01", "alloc01", "default", "STOPPED_BY_RM") + // with allocationKey + request := CreateReleaseRequestForTask("app01", "task01", "task01", "default", "STOPPED_BY_RM") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID, "alloc01") + assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - // without allocationID + // without allocationKey request = CreateReleaseRequestForTask("app01", "task01", "", "default", "STOPPED_BY_RM") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease == nil) diff --git a/pkg/common/test/recoverable_apps_mock.go b/pkg/common/test/recoverable_apps_mock.go index 3ebb1b12..9efdd268 100644 --- a/pkg/common/test/recoverable_apps_mock.go +++ b/pkg/common/test/recoverable_apps_mock.go @@ -38,9 +38,8 @@ func (m *MockedRecoverableAppManager) ListPods() ([]*v1.Pod, error) { func (m *MockedRecoverableAppManager) GetExistingAllocation(pod *v1.Pod) *si.Allocation { return &si.Allocation{ - AllocationKey: pod.Name, + AllocationKey: string(pod.UID), AllocationTags: nil, - AllocationID: string(pod.UID), ResourcePerAlloc: nil, Priority: 0, NodeID: pod.Spec.NodeName, diff --git a/test/e2e/basic_scheduling/basic_scheduling_test.go b/test/e2e/basic_scheduling/basic_scheduling_test.go index 4a7de566..9ca63d7f 100644 --- a/test/e2e/basic_scheduling/basic_scheduling_test.go +++ b/test/e2e/basic_scheduling/basic_scheduling_test.go @@ -111,7 +111,6 @@ var _ = ginkgo.Describe("", func() { gomega.Ω(allocation.AllocationKey).NotTo(gomega.BeNil()) gomega.Ω(allocation.NodeID).NotTo(gomega.BeNil()) gomega.Ω(allocation.Partition).NotTo(gomega.BeNil()) - gomega.Ω(allocation.AllocationID).NotTo(gomega.BeNil()) gomega.Ω(allocation.ApplicationID).To(gomega.Equal(sleepRespPod.ObjectMeta.Labels["applicationId"])) core := sleepRespPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() mem := sleepRespPod.Spec.Containers[0].Resources.Requests.Memory().Value() diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go b/test/e2e/recovery_and_restart/recovery_and_restart_test.go index 0492890d..5e89952f 100644 --- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go +++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go @@ -142,7 +142,6 @@ var _ = ginkgo.Describe("", func() { gomega.Ω(allocations.AllocationKey).NotTo(gomega.BeNil()) gomega.Ω(allocations.NodeID).NotTo(gomega.BeNil()) gomega.Ω(allocations.Partition).NotTo(gomega.BeNil()) - gomega.Ω(allocations.AllocationID).NotTo(gomega.BeNil()) gomega.Ω(allocations.ApplicationID).To(gomega.Equal(sleepRespPod.ObjectMeta.Labels["applicationId"])) core := sleepRespPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() mem := sleepRespPod.Spec.Containers[0].Resources.Requests.Memory().Value() --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org