This is an automated email from the ASF dual-hosted git repository.
manirajv06 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 0160be23 [YUNIKORN-3278] Remove code: clean up scheduler_cache.go
(#1026)
0160be23 is described below
commit 0160be2367e5897247bd75c3cda08767771b1ed5
Author: Manikandan R <[email protected]>
AuthorDate: Tue May 12 15:22:45 2026 +0530
[YUNIKORN-3278] Remove code: clean up scheduler_cache.go (#1026)
Closes: #1026
Signed-off-by: Manikandan R <[email protected]>
---
pkg/cache/context.go | 8 --
pkg/cache/external/scheduler_cache.go | 158 +++++------------------------
pkg/cache/external/scheduler_cache_dao.go | 16 ++-
pkg/cache/external/scheduler_cache_test.go | 4 -
4 files changed, 31 insertions(+), 155 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 850194ae..e5290d23 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -866,12 +866,6 @@ func (ctx *Context) ForgetPod(name string) {
log.Log(log.ShimContext).Debug("unable to forget pod: not found in
cache", zap.String("pod", name))
}
-// IsTaskMaybeSchedulable returns true if a task might be currently able to be
scheduled. This uses a bloom filter
-// cached from a set of taskIDs to perform efficient negative lookups.
-func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool {
- return ctx.schedulerCache.IsTaskMaybeSchedulable(taskID)
-}
-
func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
if app == nil {
log.Log(log.ShimContext).Debug("In notifyTaskComplete but app
is nil",
@@ -1197,7 +1191,6 @@ func (ctx *Context) HandleContainerStateUpdate(request
*si.UpdateContainerSchedu
// auto-scaler scans pods whose pod condition is
PodScheduled=false && reason=Unschedulable
// if the pod is skipped because the queue quota has
been exceeded, we do not trigger the auto-scaling
task.SetTaskSchedulingState(TaskSchedSkipped)
-
ctx.schedulerCache.NotifyTaskSchedulerAction(task.taskID)
if ctx.updatePodCondition(task,
&v1.PodCondition{
Type: v1.PodScheduled,
@@ -1211,7 +1204,6 @@ func (ctx *Context) HandleContainerStateUpdate(request
*si.UpdateContainerSchedu
}
case si.UpdateContainerSchedulingStateRequest_FAILED:
task.SetTaskSchedulingState(TaskSchedFailed)
-
ctx.schedulerCache.NotifyTaskSchedulerAction(task.taskID)
// set pod condition to Unschedulable in order to
trigger auto-scaling
if ctx.updatePodCondition(task,
&v1.PodCondition{
diff --git a/pkg/cache/external/scheduler_cache.go
b/pkg/cache/external/scheduler_cache.go
index c1217b98..7aab20db 100644
--- a/pkg/cache/external/scheduler_cache.go
+++ b/pkg/cache/external/scheduler_cache.go
@@ -20,7 +20,6 @@ package external
import (
"fmt"
- "sync/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -41,58 +40,36 @@ import (
// SchedulerCache maintains some critical information about nodes and pods
used for scheduling.
// Nodes are cached in the form of de-scheduler nodeInfo. Instead of
re-creating all nodes info from scratch,
// we replicate nodes info from de-scheduler, in order to re-use predicates
functions.
-//
-// When running YuniKorn as a scheduler plugin, we also track pod allocations
that YuniKorn has decided upon, but which
-// have not yet been fulfilled by the default scheduler. This tracking is
needed to ensure that we pass along
-// allocations to the default scheduler once (and only) once. Allocations can
be in one of two states, either pending or
-// in-progress. A pending allocation is one which has been decided upon by
YuniKorn but has not yet been communicated
-// to the default scheduler via PreFilter() / Filter(). Once PreFilter() /
Filter() pass, the allocation transitions
-// to in-progress to signify that the default scheduler is responsible for
fulfilling the allocation. Once PostBind()
-// is called in the plugin to signify completion of the allocation, it is
removed.
type SchedulerCache struct {
- nodesMap map[string]*framework.NodeInfo // node name to
NodeInfo map
- podsMap map[string]*v1.Pod
- pcMap map[string]*schedulingv1.PriorityClass
- assignedPods map[string]string // map of pods to the node
they are currently assigned to
- assumedPods map[string]bool // map of assumed pods,
value indicates if pod volumes are all bound
- orphanedPods map[string]*v1.Pod // map of orphaned pods,
keyed by pod UID
- pendingAllocations map[string]string // map of pod to node ID,
presence indicates a pending allocation for scheduler
- inProgressAllocations map[string]string // map of pod to node ID,
presence indicates an in-process allocation for scheduler
- schedulingTasks map[string]interface{} // list of task IDs which
are currently being processed by the scheduler
- pvcRefCounts map[string]map[string]int
- lock locking.RWMutex
- clients *client.Clients // client APIs
- klogger klog.Logger
+ nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map
+ podsMap map[string]*v1.Pod
+ pcMap map[string]*schedulingv1.PriorityClass
+ assignedPods map[string]string // map of pods to the node they are
currently assigned to
+ assumedPods map[string]bool // map of assumed pods, value indicates
if pod volumes are all bound
+ orphanedPods map[string]*v1.Pod // map of orphaned pods, keyed by pod
UID
+ pvcRefCounts map[string]map[string]int
+ lock locking.RWMutex
+ clients *client.Clients // client APIs
+ klogger klog.Logger
// cached data, re-calculated on demand from nodesMap
nodesInfo []fwk.NodeInfo
nodesInfoPodsWithAffinity []fwk.NodeInfo
nodesInfoPodsWithReqAntiAffinity []fwk.NodeInfo
-
- // task bloom filter, recomputed whenever task scheduling state changes
- taskBloomFilterRef atomic.Pointer[taskBloomFilter]
-}
-
-type taskBloomFilter struct {
- data [4][256]bool
}
func NewSchedulerCache(clients *client.Clients) *SchedulerCache {
cache := &SchedulerCache{
- nodesMap: make(map[string]*framework.NodeInfo),
- podsMap: make(map[string]*v1.Pod),
- pcMap:
make(map[string]*schedulingv1.PriorityClass),
- assignedPods: make(map[string]string),
- assumedPods: make(map[string]bool),
- orphanedPods: make(map[string]*v1.Pod),
- pendingAllocations: make(map[string]string),
- inProgressAllocations: make(map[string]string),
- schedulingTasks: make(map[string]interface{}),
- pvcRefCounts: make(map[string]map[string]int),
- clients: clients,
- klogger: klog.NewKlogr(),
+ nodesMap: make(map[string]*framework.NodeInfo),
+ podsMap: make(map[string]*v1.Pod),
+ pcMap: make(map[string]*schedulingv1.PriorityClass),
+ assignedPods: make(map[string]string),
+ assumedPods: make(map[string]bool),
+ orphanedPods: make(map[string]*v1.Pod),
+ pvcRefCounts: make(map[string]map[string]int),
+ clients: clients,
+ klogger: klog.NewKlogr(),
}
- cache.taskBloomFilterRef.Store(&taskBloomFilter{})
return cache
}
@@ -232,8 +209,6 @@ func (cache *SchedulerCache) removeNode(node *v1.Node)
(*v1.Node, []*v1.Pod) {
key := string(pod.UID)
delete(cache.assignedPods, key)
delete(cache.assumedPods, key)
- delete(cache.pendingAllocations, key)
- delete(cache.inProgressAllocations, key)
cache.orphanedPods[key] = pod
orphans = append(orphans, pod)
}
@@ -291,24 +266,6 @@ func (cache *SchedulerCache)
removePriorityClass(priorityClass *schedulingv1.Pri
delete(cache.pcMap, priorityClass.Name)
}
-// NotifyTaskSchedulerAction registers the fact that a task has been evaluated
for scheduling, and consequently the
-// scheduler plugin should move it to the activeQ if requested to do so.
-func (cache *SchedulerCache) NotifyTaskSchedulerAction(taskID string) {
- cache.lock.Lock()
- defer cache.lock.Unlock()
- // verify that the pod exists in the cache, otherwise ignore
- if pod := cache.GetPodNoLock(taskID); pod == nil {
- return
- }
- cache.addSchedulingTask(taskID)
-}
-
-// IsTaskMaybeSchedulable returns true if a task might be currently able to be
scheduled. This uses a bloom filter
-// cached from a set of taskIDs to perform efficient negative lookups.
-func (cache *SchedulerCache) IsTaskMaybeSchedulable(taskID string) bool {
- return cache.taskBloomFilterRef.Load().isTaskMaybePresent(taskID)
-}
-
// IsAssumedPod returns if pod is assumed in cache, avoid nil
func (cache *SchedulerCache) IsAssumedPod(podKey string) bool {
cache.lock.RLock()
@@ -374,9 +331,6 @@ func (cache *SchedulerCache) updatePod(pod *v1.Pod) bool {
if utils.IsPodRunning(pod) || utils.IsPodTerminated(pod) {
// delete all assumed state from cache, as pod has now been
bound
delete(cache.assumedPods, key)
- delete(cache.pendingAllocations, key)
- delete(cache.inProgressAllocations, key)
- cache.removeSchedulingTask(key)
}
if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) {
@@ -413,9 +367,6 @@ func (cache *SchedulerCache) updatePod(pod *v1.Pod) bool {
delete(cache.assignedPods, key)
delete(cache.assumedPods, key)
delete(cache.orphanedPods, key)
- delete(cache.pendingAllocations, key)
- delete(cache.inProgressAllocations, key)
- cache.removeSchedulingTask(key)
}
return result
@@ -450,51 +401,10 @@ func (cache *SchedulerCache) removePod(pod *v1.Pod) {
delete(cache.assignedPods, key)
delete(cache.assumedPods, key)
delete(cache.orphanedPods, key)
- delete(cache.pendingAllocations, key)
- delete(cache.inProgressAllocations, key)
- cache.removeSchedulingTask(key)
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
}
-func (cache *SchedulerCache) removeSchedulingTask(taskID string) {
- delete(cache.schedulingTasks, taskID)
- filter := &taskBloomFilter{}
- for taskID := range cache.schedulingTasks {
- filter.addTask(taskID)
- }
- cache.taskBloomFilterRef.Store(filter)
-}
-
-func (cache *SchedulerCache) addSchedulingTask(taskID string) {
- cache.schedulingTasks[taskID] = nil
- filter := &taskBloomFilter{
- data: cache.taskBloomFilterRef.Load().data,
- }
- filter.addTask(taskID)
- cache.taskBloomFilterRef.Store(filter)
-}
-
-func (filter *taskBloomFilter) addTask(taskID string) {
- limit := min(4, len(taskID))
- for i := 0; i < limit; i++ {
- filter.data[i][taskID[i]] = true
- }
-}
-
-func (filter *taskBloomFilter) isTaskMaybePresent(taskID string) bool {
- limit := len(taskID)
- if limit > 4 {
- limit = 4
- }
- for i := 0; i < limit; i++ {
- if !filter.data[i][taskID[i]] {
- return false
- }
- }
- return true
-}
-
func (cache *SchedulerCache) GetPod(uid string) *v1.Pod {
cache.lock.RLock()
defer cache.lock.RUnlock()
@@ -556,9 +466,6 @@ func (cache *SchedulerCache) forgetPod(pod *v1.Pod) {
zap.String("podKey", key))
delete(cache.assumedPods, key)
- delete(cache.pendingAllocations, key)
- delete(cache.inProgressAllocations, key)
- cache.removeSchedulingTask(key)
}
// Implement k8s.io/client-go/listers/core/v1#PodLister interface
@@ -614,10 +521,7 @@ func (cache *SchedulerCache) dumpState(context string) {
zap.Int("nodes", len(cache.nodesMap)),
zap.Int("pods", len(cache.podsMap)),
zap.Int("assumed", len(cache.assumedPods)),
- zap.Int("pendingAllocs", len(cache.pendingAllocations)),
- zap.Int("inProgressAllocs",
len(cache.inProgressAllocations)),
zap.Int("podsAssigned", cache.nodePodCount()),
- zap.Int("schedulingTasks", len(cache.schedulingTasks)),
zap.Any("phases", cache.podPhases()))
}
}
@@ -756,16 +660,6 @@ func (cache *SchedulerCache) GetSchedulerCacheDao()
SchedulerCacheDao {
info.AllVolumesBound = allBound
}
}
- for podUID, nodeName := range cache.pendingAllocations {
- if info, ok := podSchedulingInfoByUID[podUID]; ok {
- info.PendingNode = nodeName
- }
- }
- for podUID, nodeName := range cache.inProgressAllocations {
- if info, ok := podSchedulingInfoByUID[podUID]; ok {
- info.InProgressNode = nodeName
- }
- }
podSchedulingInfoByName := make(map[string]PodSchedulingInfoDao)
for _, info := range podSchedulingInfoByUID {
@@ -774,14 +668,12 @@ func (cache *SchedulerCache) GetSchedulerCacheDao()
SchedulerCacheDao {
return SchedulerCacheDao{
Statistics: SchedulerCacheStatisticsDao{
- Nodes: len(cache.nodesMap),
- Pods: len(cache.podsMap),
- PriorityClasses: len(cache.pcMap),
- Assumed: len(cache.assumedPods),
- PendingAllocations: len(cache.pendingAllocations),
- InProgressAllocations: len(cache.inProgressAllocations),
- PodsAssigned: cache.nodePodCount(),
- Phases: cache.podPhases(),
+ Nodes: len(cache.nodesMap),
+ Pods: len(cache.podsMap),
+ PriorityClasses: len(cache.pcMap),
+ Assumed: len(cache.assumedPods),
+ PodsAssigned: cache.nodePodCount(),
+ Phases: cache.podPhases(),
},
Nodes: nodes,
Pods: pods,
diff --git a/pkg/cache/external/scheduler_cache_dao.go
b/pkg/cache/external/scheduler_cache_dao.go
index 05d28004..f34df917 100644
--- a/pkg/cache/external/scheduler_cache_dao.go
+++ b/pkg/cache/external/scheduler_cache_dao.go
@@ -34,14 +34,12 @@ type SchedulerCacheDao struct {
}
type SchedulerCacheStatisticsDao struct {
- Nodes int `json:"nodes,omitempty"`
- Pods int `json:"pods,omitempty"`
- PriorityClasses int `json:"priorityClasses,omitempty"`
- Assumed int `json:"assumed,omitempty"`
- PendingAllocations int
`json:"pendingAllocations,omitempty"`
- InProgressAllocations int
`json:"inProgressAllocations,omitempty"`
- PodsAssigned int `json:"podsAssigned,omitempty"`
- Phases map[string]int `json:"phases,omitempty"`
+ Nodes int `json:"nodes,omitempty"`
+ Pods int `json:"pods,omitempty"`
+ PriorityClasses int `json:"priorityClasses,omitempty"`
+ Assumed int `json:"assumed,omitempty"`
+ PodsAssigned int `json:"podsAssigned,omitempty"`
+ Phases map[string]int `json:"phases,omitempty"`
}
type NodeDao struct {
@@ -100,6 +98,4 @@ type PodSchedulingInfoDao struct {
AssignedNode string `json:"assignedNode,omitempty"`
Assumed bool `json:"assumed,omitempty"`
AllVolumesBound bool `json:"allVolumesBound,omitempty"`
- PendingNode string `json:"pendingNode,omitempty"`
- InProgressNode string `json:"inProgressNode,omitempty"`
}
diff --git a/pkg/cache/external/scheduler_cache_test.go
b/pkg/cache/external/scheduler_cache_test.go
index d97b125f..a24f603d 100644
--- a/pkg/cache/external/scheduler_cache_test.go
+++ b/pkg/cache/external/scheduler_cache_test.go
@@ -921,8 +921,6 @@ func TestGetSchedulerCacheDao(t *testing.T) {
assert.Equal(t, dao.Statistics.PriorityClasses, 0)
assert.Equal(t, dao.Statistics.Assumed, 0)
assert.Equal(t, dao.Statistics.PodsAssigned, 0)
- assert.Equal(t, dao.Statistics.InProgressAllocations, 0)
- assert.Equal(t, dao.Statistics.PendingAllocations, 0)
resourceList := make(map[v1.ResourceName]resource.Quantity)
resourceList[v1.ResourceName("memory")] =
*resource.NewQuantity(1024*1000*1000, resource.DecimalSI)
@@ -988,8 +986,6 @@ func TestGetSchedulerCacheDao(t *testing.T) {
assert.Equal(t, dao.Statistics.PriorityClasses, 1)
assert.Equal(t, dao.Statistics.Assumed, 0)
assert.Equal(t, dao.Statistics.PodsAssigned, 0)
- assert.Equal(t, dao.Statistics.InProgressAllocations, 0)
- assert.Equal(t, dao.Statistics.PendingAllocations, 0)
}
func expectHost1AndHost2(t *testing.T, nodesInfo []fwk.NodeInfo) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]