This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f8da23f [YUNIKORN-2531] Create unit tests for AsyncRMCallback (#832)
4f8da23f is described below

commit 4f8da23f4f482ccfa1c57a1f943777a72b3c07dd
Author: Peter Bacsko <pbac...@cloudera.com>
AuthorDate: Tue May 14 17:01:18 2024 +1000

    [YUNIKORN-2531] Create unit tests for AsyncRMCallback (#832)
    
    Closes: #832
    
    Signed-off-by: Wilfred Spiegelenburg <wilfr...@apache.org>
---
 pkg/cache/context.go                 |   5 -
 pkg/cache/scheduler_callback_test.go | 629 +++++++++++++++++++++++++++++++++++
 pkg/common/events/recorder.go        |   6 -
 3 files changed, 629 insertions(+), 11 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index b97b9289..c58b7914 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -664,11 +664,6 @@ func (ctx *Context) IsPodFitNode(name, node string, 
allocate bool) error {
 }
 
 func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations 
[]string, startIndex int) (int, bool) {
-       // assume minimal pods need killing if running in testing mode
-       if ctx.apiProvider.IsTestingMode() {
-               return startIndex, false
-       }
-
        ctx.lock.RLock()
        defer ctx.lock.RUnlock()
        if pod := ctx.schedulerCache.GetPod(name); pod != nil {
diff --git a/pkg/cache/scheduler_callback_test.go 
b/pkg/cache/scheduler_callback_test.go
new file mode 100644
index 00000000..ce2633ac
--- /dev/null
+++ b/pkg/cache/scheduler_callback_test.go
@@ -0,0 +1,629 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package cache
+
+import (
+       "encoding/json"
+       "strings"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "gotest.tools/v3/assert"
+       v1 "k8s.io/api/core/v1"
+       apis "k8s.io/apimachinery/pkg/apis/meta/v1"
+       k8sEvents "k8s.io/client-go/tools/events"
+       "k8s.io/kubernetes/pkg/scheduler/framework"
+
+       "github.com/apache/yunikorn-k8shim/pkg/client"
+       "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       "github.com/apache/yunikorn-k8shim/pkg/common/events"
+       "github.com/apache/yunikorn-k8shim/pkg/common/test"
+       "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/plugin/predicates"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+func TestUpdateAllocation_NewTask(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               New: []*si.Allocation{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                               NodeID:        fakeNodeName,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       assert.Assert(t, context.schedulerCache.IsAssumedPod(taskUID1))
+       task := context.getTask(appID, taskUID1)
+       err = utils.WaitForCondition(func() bool {
+               return task.GetTaskState() == TaskStates().Bound
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "task has not transitioned to Bound state")
+}
+
+func TestUpdateAllocation_NewTask_TaskNotFound(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       pod := context.getTask(appID, taskUID1).pod
+       context.RemoveTask(appID, taskUID1)
+       context.DeletePod(pod)
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               New: []*si.Allocation{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                               NodeID:        fakeNodeName,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+}
+
+func TestUpdateAllocation_NewTask_AssumePodFails(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       binder := test.NewVolumeBinderMock()
+       const errMsg = "error assuming pod volumes"
+       binder.SetAssumePodVolumesError(errMsg)
+       setVolumeBinder(context, binder)
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               New: []*si.Allocation{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                               NodeID:        fakeNodeName,
+                       },
+               },
+       })
+       assert.Error(t, err, errMsg)
+       assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
+       task := context.getTask(appID, taskUID1)
+       err = utils.WaitForCondition(func() bool {
+               return task.GetTaskState() == TaskStates().Failed
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "task has not transitioned to Failed state")
+}
+
+func TestUpdateAllocation_NewTask_PodAlreadyAssigned(t *testing.T) {
+       callback, context := initCallbackTest(t, true, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               New: []*si.Allocation{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                               NodeID:        fakeNodeName,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       assert.Assert(t, context.schedulerCache.IsAssumedPod(taskUID1))
+       task := context.getTask(appID, taskUID1)
+       assert.Equal(t, TaskStates().Bound, task.GetTaskState())
+       assert.Equal(t, fakeNodeName, task.nodeName)
+       assert.Equal(t, TaskSchedAllocated, task.schedulingState)
+}
+
+func TestUpdateAllocation_AskRejected(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               Rejected: []*si.RejectedAllocationAsk{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       task := context.getTask(appID, taskUID1)
+       err = utils.WaitForCondition(func() bool {
+               return task.GetTaskState() == TaskStates().Failed
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "task has not transitioned to Failed state")
+}
+
+func TestUpdateAllocation_AllocationRejected(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               RejectedAllocations: []*si.RejectedAllocation{
+                       {
+                               ApplicationID: appID,
+                               AllocationKey: taskUID1,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       task := context.getTask(appID, taskUID1)
+       err = utils.WaitForCondition(func() bool {
+               return task.GetTaskState() == TaskStates().Failed
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "task has not transitioned to Failed state")
+}
+
+func TestUpdateAllocation_AllocationReleased(t *testing.T) {
+       // release allocation where terminationType != 
TerminationType_STOPPED_BY_RM
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       err := context.AssumePod(taskUID1, fakeNodeName)
+       assert.NilError(t, err, "could not assume pod")
+       app := context.getApplication(appID)
+       assert.Assert(t, app != nil)
+       app.sm.SetState(ApplicationStates().Running)
+       var deleteCalled atomic.Bool
+       context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod 
*v1.Pod) error { //nolint:errcheck
+               deleteCalled.Store(true)
+               return nil
+       })
+       task := context.getTask(appID, taskUID1)
+       task.allocationKey = taskUID1
+
+       err = callback.UpdateAllocation(&si.AllocationResponse{
+               Released: []*si.AllocationRelease{
+                       {
+                               ApplicationID:   appID,
+                               AllocationKey:   taskUID1,
+                               TerminationType: 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
+       err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, 
time.Second)
+       assert.NilError(t, err, "pod has not been deleted")
+}
+
+func TestUpdateAllocation_AllocationReleased_StoppedByRM(t *testing.T) {
+       // release allocation where terminationType == 
TerminationType_STOPPED_BY_RM
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       err := context.AssumePod(taskUID1, fakeNodeName)
+       assert.NilError(t, err, "could not assume pod")
+       app := context.getApplication(appID)
+       assert.Assert(t, app != nil)
+       app.sm.SetState(ApplicationStates().Running)
+       var deleteCalled atomic.Bool
+       context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod 
*v1.Pod) error { //nolint:errcheck
+               deleteCalled.Store(true)
+               return nil
+       })
+
+       err = callback.UpdateAllocation(&si.AllocationResponse{
+               Released: []*si.AllocationRelease{
+                       {
+                               ApplicationID:   appID,
+                               AllocationKey:   taskUID1,
+                               TerminationType: 
si.TerminationType_STOPPED_BY_RM,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
+       err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, 
500*time.Millisecond)
+       assert.Error(t, err, "timeout waiting for condition") // pod is not 
expected to be deleted
+}
+
+func TestUpdateAllocation_AskReleased(t *testing.T) {
+       callback, context := initCallbackTest(t, false, true)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Running)
+       var deleteCalled atomic.Bool
+       context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod 
*v1.Pod) error { //nolint:errcheck
+               deleteCalled.Store(true)
+               return nil
+       })
+
+       err := callback.UpdateAllocation(&si.AllocationResponse{
+               ReleasedAsks: []*si.AllocationAskRelease{
+                       {
+                               ApplicationID:   appID,
+                               AllocationKey:   taskUID1,
+                               TerminationType: si.TerminationType_TIMEOUT,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating allocation")
+       assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
+       err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, 
time.Second)
+       assert.NilError(t, err, "pod has not been deleted")
+}
+
+func TestUpdateApplication_Accepted(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Submitted)
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Accepted: []*si.AcceptedApplication{
+                       {
+                               ApplicationID: appID,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       err = utils.WaitForCondition(func() bool {
+               return app.sm.Current() == ApplicationStates().Accepted
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "application has not transitioned to Accepted 
state")
+}
+
+func TestUpdateApplication_Rejected(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Submitted)
+       NewPlaceholderManager(context.apiProvider.GetAPIs())
+       recorder := k8sEvents.NewFakeRecorder(1024)
+       events.SetRecorder(recorder)
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Rejected: []*si.RejectedApplication{
+                       {
+                               ApplicationID: appID,
+                               Reason:        "test failure",
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       err = utils.WaitForCondition(func() bool {
+               return app.sm.Current() == ApplicationStates().Failed
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "application has not transitioned to Failed 
state")
+       assert.Equal(t, 1, len(recorder.Events), "no K8s event received")
+       event := <-recorder.Events
+       assert.Assert(t, strings.Contains(event, "test failure"), "event does 
not contain 'test failure': %s", event)
+}
+
+func TestUpdateApplication_Completed(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Updated: []*si.UpdatedApplication{
+                       {
+                               ApplicationID: appID,
+                               State:         ApplicationStates().Completed,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       assert.Assert(t, context.getApplication(appID) == nil, "application was 
not removed")
+}
+
+func TestUpdateApplication_Resuming_AppReserving(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Reserving)
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Updated: []*si.UpdatedApplication{
+                       {
+                               ApplicationID: appID,
+                               State:         ApplicationStates().Resuming,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       err = utils.WaitForCondition(func() bool {
+               return app.sm.Current() == ApplicationStates().Resuming
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "application has not transitioned to Resuming 
state")
+}
+
+func TestUpdateApplication_Resuming_AppNotReserving(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Submitted) // not in Reserving - 
nothing should happen
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Updated: []*si.UpdatedApplication{
+                       {
+                               ApplicationID: appID,
+                               State:         ApplicationStates().Resuming,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       time.Sleep(time.Second) // wait and make sure that the app is still 
submitted
+       assert.Equal(t, ApplicationStates().Submitted, 
app.GetApplicationState(), "application state has been changed unexpectedly")
+}
+
+func TestUpdateApplication_Resuming_AppNotFound(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Reserving)
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Updated: []*si.UpdatedApplication{
+                       {
+                               ApplicationID: "nonExisting",
+                               State:         ApplicationStates().Resuming,
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+}
+
+func TestUpdateApplication_Failing(t *testing.T) {
+       testUpdateApplicationFailure(t, ApplicationStates().Failing)
+}
+
+func TestUpdateApplication_Failed(t *testing.T) {
+       testUpdateApplicationFailure(t, ApplicationStates().Failed)
+}
+
+func testUpdateApplicationFailure(t *testing.T, state string) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       app := context.getApplication(appID)
+       app.sm.SetState(ApplicationStates().Running)
+       NewPlaceholderManager(context.apiProvider.GetAPIs())
+       recorder := k8sEvents.NewFakeRecorder(1024)
+       events.SetRecorder(recorder)
+
+       err := callback.UpdateApplication(&si.ApplicationResponse{
+               Updated: []*si.UpdatedApplication{
+                       {
+                               ApplicationID: appID,
+                               State:         state,
+                               Message:       "test failure",
+                       },
+               },
+       })
+       assert.NilError(t, err, "error updating application")
+       err = utils.WaitForCondition(func() bool {
+               return app.sm.Current() == ApplicationStates().Failing
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err, "application has not transitioned to %s state", 
state)
+       assert.Equal(t, 1, len(recorder.Events), "no K8s event received")
+       event := <-recorder.Events
+       assert.Assert(t, strings.Contains(event, "test failure"), "event does 
not contain 'test failure': %s", event)
+}
+
+func TestUpdateNode_Accepted(t *testing.T) {
+       testUpdateNode(t, "NodeAccepted", &si.NodeResponse{
+               Accepted: []*si.AcceptedNode{
+                       {
+                               NodeID: "testNode",
+                       },
+               },
+       })
+}
+
+func TestUpdateNode_Rejected(t *testing.T) {
+       testUpdateNode(t, "NodeRejected", &si.NodeResponse{
+               Rejected: []*si.RejectedNode{
+                       {
+                               NodeID: "testNode",
+                       },
+               },
+       })
+}
+
+func testUpdateNode(t *testing.T, expectedEvent string, response 
*si.NodeResponse) {
+       callback, _ := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       var nodeName atomic.Value
+       dispatcher.RegisterEventHandler("testNodeHandler", 
dispatcher.EventTypeNode, func(event interface{}) {
+               nodeEvent := event.(CachedSchedulerNodeEvent) //nolint:errcheck
+               if nodeEvent.GetEvent() == expectedEvent {
+                       nodeName.Store(nodeEvent.GetNodeID())
+               }
+       })
+
+       err := callback.UpdateNode(response)
+       assert.NilError(t, err, "error updating node")
+       err = utils.WaitForCondition(func() bool {
+               if val, ok := nodeName.Load().(string); ok {
+                       return val == "testNode"
+               }
+               return false
+       }, 10*time.Millisecond, time.Second)
+       assert.NilError(t, err)
+}
+
+func TestPredicates(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       context.predManager = &mockPredicateManager{}
+
+       // pod not found
+       err := callback.Predicates(&si.PredicatesArgs{AllocationKey: "unknown", 
NodeID: fakeNodeName, Allocate: true})
+       assert.Error(t, err, "predicates were not run because pod was not found 
in cache")
+
+       // pod found
+       err = callback.Predicates(&si.PredicatesArgs{AllocationKey: taskUID1, 
NodeID: fakeNodeName, Allocate: true})
+       assert.NilError(t, err)
+}
+
+func TestPreemptionPredicates(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+       context.predManager = &mockPredicateManager{}
+
+       // pod not found
+       resp := 
callback.PreemptionPredicates(&si.PreemptionPredicatesArgs{AllocationKey: 
"unknown", NodeID: fakeNodeName, StartIndex: 0})
+       assert.Assert(t, !resp.Success, "response should have failed")
+
+       // pod found
+       resp = 
callback.PreemptionPredicates(&si.PreemptionPredicatesArgs{AllocationKey: 
taskUID1, NodeID: fakeNodeName, StartIndex: 0, PreemptAllocationKeys: 
[]string{taskUID1}})
+       assert.Assert(t, resp.Success, "response should have succeeded")
+       assert.Equal(t, int32(0), resp.Index)
+}
+
+func TestSendEvent(t *testing.T) {
+       callback, _ := initCallbackTest(t, false, false)
+       recorder := k8sEvents.NewFakeRecorder(1024)
+       events.SetRecorder(recorder)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       callback.SendEvent([]*si.EventRecord{
+               {
+                       Type:        si.EventRecord_REQUEST,
+                       ObjectID:    taskUID1,
+                       ReferenceID: appID,
+                       Message:     "request test message",
+               },
+               {
+                       Type:              si.EventRecord_NODE,
+                       EventChangeType:   si.EventRecord_ADD,
+                       EventChangeDetail: si.EventRecord_DETAILS_NONE,
+                       ObjectID:          fakeNodeName,
+                       Message:           "node test message",
+               },
+       })
+
+       assert.Equal(t, 2, len(recorder.Events))
+       event := <-recorder.Events
+       assert.Assert(t, strings.Contains(event, "request test message"), 
"event does not contain 'request test message': %s", event)
+       event = <-recorder.Events
+       assert.Assert(t, strings.Contains(event, "node test message"), "event 
does not contain 'node test message': %s", event)
+}
+
+func TestUpdateContainerSchedulingState(t *testing.T) {
+       callback, context := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       
callback.UpdateContainerSchedulingState(&si.UpdateContainerSchedulingStateRequest{
+               State:         si.UpdateContainerSchedulingStateRequest_FAILED,
+               ApplicationID: appID,
+               AllocationKey: taskUID1,
+       })
+
+       task := context.getTask(appID, taskUID1)
+       assert.Equal(t, TaskSchedFailed, task.GetTaskSchedulingState())
+}
+
+func TestCallbackGetStateDump(t *testing.T) {
+       callback, _ := initCallbackTest(t, false, false)
+       defer dispatcher.UnregisterAllEventHandlers()
+       defer dispatcher.Stop()
+
+       stateDumpStr, err := callback.GetStateDump()
+       assert.NilError(t, err, "unable to get state dump")
+       stateDump := make(map[string]interface{})
+       err = json.Unmarshal([]byte(stateDumpStr), &stateDump)
+       assert.NilError(t, err)
+       assert.Assert(t, stateDump["cache"] != nil, "no 'cache' entry in the 
state dump")
+}
+
+var _ predicates.PredicateManager = &mockPredicateManager{}
+
+type mockPredicateManager struct{}
+
+func (m *mockPredicateManager) EventsToRegister(_ framework.QueueingHintFn) 
[]framework.ClusterEventWithHint {
+       return nil
+}
+
+func (m *mockPredicateManager) Predicates(_ *v1.Pod, _ *framework.NodeInfo, _ 
bool) (plugin string, error error) {
+       return "", nil
+}
+
+func (m *mockPredicateManager) PreemptionPredicates(_ *v1.Pod, _ 
*framework.NodeInfo, _ []*v1.Pod, _ int) (index int, ok bool) {
+       return 0, true
+}
+
+func initCallbackTest(t *testing.T, podAssigned, placeholder bool) 
(*AsyncRMCallback, *Context) {
+       context, apiProvider := initContextAndAPIProviderForTest()
+       dispatcher.Start()
+       dispatcher.RegisterEventHandler("TestAppHandler", 
dispatcher.EventTypeApp, context.ApplicationEventHandler())
+       dispatcher.RegisterEventHandler("TestTaskHandler", 
dispatcher.EventTypeTask, context.TaskEventHandler())
+       callback := NewAsyncRMCallback(context)
+       apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) 
error {
+               for _, node := range request.Nodes {
+                       dispatcher.Dispatch(CachedSchedulerNodeEvent{
+                               NodeID: node.NodeID,
+                               Event:  NodeAccepted,
+                       })
+               }
+               return nil
+       })
+       node1 := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      fakeNodeName,
+                       Namespace: "default",
+                       UID:       "uid_0001",
+               },
+       }
+       context.addNode(&node1)
+       pod := &v1.Pod{
+               TypeMeta: apis.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: "v1",
+               },
+               ObjectMeta: apis.ObjectMeta{
+                       Name: "yunikorn-test-00001",
+                       UID:  taskUID1,
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: appID,
+                       },
+               },
+               Spec: v1.PodSpec{SchedulerName: "yunikorn"},
+       }
+       if podAssigned {
+               pod.Spec.NodeName = fakeNodeName
+       }
+       if placeholder {
+               pod.Annotations[constants.AnnotationPlaceholderFlag] = 
constants.True
+       }
+       context.AddPod(pod)
+       task := context.getTask(appID, taskUID1)
+       assert.Assert(t, task != nil)
+       task.sm.SetState(TaskStates().Scheduling)
+
+       return callback, context
+}
diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go
index 161c0eef..48a06fa7 100644
--- a/pkg/common/events/recorder.go
+++ b/pkg/common/events/recorder.go
@@ -59,9 +59,3 @@ func SetRecorder(recorder events.EventRecorder) {
        eventRecorder = recorder
        once.Do(func() {}) // make sure Do() doesn't fire elsewhere
 }
-
-func SetRecorderForTest(recorder events.EventRecorder) {
-       lock.Lock()
-       defer lock.Unlock()
-       eventRecorder = recorder
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to