This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new 03978808 feat(vermeer): support task priority based scheduling (#336)
03978808 is described below
commit 039788087ff4d3d05ad0a4b9cc05789d61af3bfb
Author: Jingkai Yang <[email protected]>
AuthorDate: Mon Oct 27 16:35:23 2025 +0800
feat(vermeer): support task priority based scheduling (#336)
* feat(Scheduler): Support priority/elder/depends based scheduling.
* chore: some tiny error
* Update vermeer/.gitignore
* Update CodeQL workflow to use latest actions
---------
Co-authored-by: imbajin <[email protected]>
---
.github/workflows/codeql-analysis.yml | 14 +-
vermeer/.gitignore | 4 +
vermeer/README.md | 22 +
vermeer/README.zh-CN.md | 20 +
vermeer/apps/graphio/local_file.go | 1 +
vermeer/apps/master/bl/compute_task.go | 2 +
vermeer/apps/master/bl/grpc_handlers.go | 6 +
vermeer/apps/master/bl/load_task.go | 3 +
vermeer/apps/master/bl/scheduler_bl.go | 490 ++++++++++++++-----
vermeer/apps/master/bl/task_bl.go | 61 ++-
vermeer/apps/master/bl/task_creator.go | 22 +
vermeer/apps/master/bl/worker_bl.go | 2 +-
vermeer/apps/master/master_main.go | 1 +
vermeer/apps/master/schedules/broker.go | 40 +-
.../schedules/scheduler_algorithm_manager.go | 524 +++++++++++++++++++++
.../master/schedules/scheduler_cron_manager.go | 161 +++++++
.../master/schedules/scheduler_resource_manager.go | 258 ++++++++++
.../master/schedules/scheduler_task_manager.go | 291 ++++++++++++
vermeer/apps/master/services/http_tasks.go | 32 +-
vermeer/apps/master/services/router.go | 7 +-
vermeer/apps/master/workers/worker_manager.go | 4 +
vermeer/apps/structure/task.go | 6 +
vermeer/client/client.go | 70 +++
vermeer/client/structure.go | 13 +
vermeer/config/master.ini | 1 +
vermeer/config/{master.ini => worker04.ini} | 12 +-
vermeer/docker-compose.yaml | 46 ++
vermeer/go.mod | 1 +
vermeer/go.sum | 2 +
vermeer/test/functional/compute_base.go | 97 ++++
vermeer/test/functional/compute_task.go | 3 +
vermeer/test/functional/http_interface.go | 16 +
vermeer/test/functional/load_local.go | 28 ++
.../load_local.go => scheduler/batch.go} | 42 +-
vermeer/test/scheduler/priority.go | 366 ++++++++++++++
vermeer/test/scheduler/routine.go | 81 ++++
vermeer/test/scheduler/test_scheduler.go | 87 ++++
vermeer/vermeer_test.go | 9 +
38 files changed, 2667 insertions(+), 178 deletions(-)
diff --git a/.github/workflows/codeql-analysis.yml
b/.github/workflows/codeql-analysis.yml
index 779c4406..0539d2fe 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -32,10 +32,10 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
- name: Setup Java JDK
- uses: actions/setup-java@v3
+ uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '11'
@@ -48,7 +48,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
- uses: github/codeql-action/init@v2
+ uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a
config file.
@@ -59,7 +59,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or
Java).
# If this step fails, then you should remove it and run the build
manually (see below)
- name: Autobuild
- uses: github/codeql-action/autobuild@v2
+ uses: github/codeql-action/autobuild@v3
# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
@@ -73,13 +73,13 @@ jobs:
# make release
- name: Perform CodeQL Analysis
- uses: github/codeql-action/analyze@v2
+ uses: github/codeql-action/analyze@v3
dependency-review:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
- name: 'Dependency Review'
- uses: actions/dependency-review-action@v3
+ uses: actions/dependency-review-action@v4
diff --git a/vermeer/.gitignore b/vermeer/.gitignore
index 540a67ae..53629789 100644
--- a/vermeer/.gitignore
+++ b/vermeer/.gitignore
@@ -83,3 +83,7 @@ node_modules/
/output/
/bin/*
!/bin/*.sh
+
+# Others #
+######################
+test/case/
diff --git a/vermeer/README.md b/vermeer/README.md
index 77695662..55ca14b0 100644
--- a/vermeer/README.md
+++ b/vermeer/README.md
@@ -3,6 +3,28 @@
## Introduction
Vermeer is a high-performance distributed graph computing platform based on
memory, supporting more than 15 graph algorithms, custom algorithm extensions,
and custom data source access.
+## Run with Docker
+
+Pull the image:
+```
+docker pull hugegraph/vermeer:latest
+```
+
+Create local configuration files, for example, `~/master.ini` and
`~/worker.ini`.
+
+Run with Docker. The `--env` flag specifies the file name.
+
+```
+master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master
+worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker
+```
+
+We've also provided a `docker-compose` file. Once you've created
`~/master.ini` and `~/worker.ini`, and updated the `master_peer` in
`worker.ini` to `172.20.0.10:6689`, you can run it using the following command:
+
+```
+docker-compose up -d
+```
+
## Start
```
diff --git a/vermeer/README.zh-CN.md b/vermeer/README.zh-CN.md
index 34dcec04..1b125fa3 100644
--- a/vermeer/README.zh-CN.md
+++ b/vermeer/README.zh-CN.md
@@ -3,6 +3,26 @@
## 简介
Vermeer是一个基于内存的高性能分布式图计算平台,支持15+图算法。支持自定义算法扩展,支持自定义数据源接入。
+## 基于 Docker 运行
+
+拉取镜像
+```
+docker pull hugegraph/vermeer:latest
+```
+
+创建好本地配置文件,例如`~/master.ini`与`~/worker.ini`
+
+基于docker运行,其中`--env`指定的是文件名称。
+```
+master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master
+worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker
+```
+
+我们也提供了`docker-compose`文件,当创建好`~/master.ini`与`~/worker.ini`,将`worker.ini`中的`master_peer`修改为`172.20.0.10:6689`后,即可通过以下命令运行:
+```
+docker-compose up -d
+```
+
## 运行
```
diff --git a/vermeer/apps/graphio/local_file.go
b/vermeer/apps/graphio/local_file.go
index 8cdb7262..7d611930 100644
--- a/vermeer/apps/graphio/local_file.go
+++ b/vermeer/apps/graphio/local_file.go
@@ -82,6 +82,7 @@ func (a *LocalMaker) MakeTasks(params map[string]string,
taskID int32) ([]LoadPa
logrus.Errorf(s)
return nil, errors.New(s)
}
+ logrus.Debugf("MakeTask LoadTypeLocal parse file: %s, s:%d,
e:%d", files, s, e)
for i := s; i <= e; i++ {
part := LoadPartition{}
part.Init(partID, taskID, LoadPartTypeVertex)
diff --git a/vermeer/apps/master/bl/compute_task.go
b/vermeer/apps/master/bl/compute_task.go
index 80ca8052..e8abef04 100644
--- a/vermeer/apps/master/bl/compute_task.go
+++ b/vermeer/apps/master/bl/compute_task.go
@@ -142,6 +142,8 @@ func (ctb *ComputeTaskBl) ComputeTaskStatus(
}
}
taskMgr.ForceState(computeTask.Task,
structure.TaskStateComplete)
+ // for scheduler, mark task complete
+ Scheduler.taskManager.MarkTaskComplete(taskId)
graph.SubUsingNum()
computeTask.FreeMemory()
needQuery := options.GetInt(computeTask.Task.Params,
"output.need_query") == 1
diff --git a/vermeer/apps/master/bl/grpc_handlers.go
b/vermeer/apps/master/bl/grpc_handlers.go
index e1c23558..2734c063 100644
--- a/vermeer/apps/master/bl/grpc_handlers.go
+++ b/vermeer/apps/master/bl/grpc_handlers.go
@@ -26,6 +26,7 @@ import (
"time"
"vermeer/apps/compute"
"vermeer/apps/graphio"
+ "vermeer/apps/master/schedules"
"vermeer/apps/master/threshold"
"vermeer/apps/master/workers"
pb "vermeer/apps/protos"
@@ -103,6 +104,11 @@ func (h *ServerHandler) SayHelloMaster(ctx
context.Context, req *pb.HelloMasterR
logrus.Errorf("failed to add a WorkerClient to the
WorkerManager, error: %s", err)
return &pb.HelloMasterResp{}, err
}
+ _, err = Scheduler.ChangeWorkerStatus(reqWorker.Name,
schedules.WorkerOngoingStatusIdle)
+ if err != nil {
+ logrus.Errorf("failed to change worker status to idle, error:
%s", err)
+ return &pb.HelloMasterResp{}, err
+ }
logrus.Infof("worker say hello name: %s and set to workgroup: %s,
client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
diff --git a/vermeer/apps/master/bl/load_task.go
b/vermeer/apps/master/bl/load_task.go
index 4e1f79f4..80c0023b 100644
--- a/vermeer/apps/master/bl/load_task.go
+++ b/vermeer/apps/master/bl/load_task.go
@@ -204,6 +204,9 @@ func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state
string, workerName stri
loadTask.Task.SetState(structure.TaskStateLoaded)
//TaskMgr.ForceState(loadTask.Task,
structure.TaskStateLoaded)
+ // for scheduler, mark task complete
+ Scheduler.taskManager.MarkTaskComplete(taskId)
+
logrus.Infof("graph: %s, vertex: %d, edge: %d",
graph.Name, graph.VertexCount, graph.EdgeCount)
for _, w := range graph.Workers {
logrus.Infof(
diff --git a/vermeer/apps/master/bl/scheduler_bl.go
b/vermeer/apps/master/bl/scheduler_bl.go
index bc4ccac4..95578949 100644
--- a/vermeer/apps/master/bl/scheduler_bl.go
+++ b/vermeer/apps/master/bl/scheduler_bl.go
@@ -19,6 +19,7 @@ package bl
import (
"errors"
+ "fmt"
"strconv"
"time"
"vermeer/apps/common"
@@ -28,16 +29,58 @@ import (
"github.com/sirupsen/logrus"
)
+/*
+* @Description: ScheduleBl is the scheduler business logic.
+* @Note: This is the main scheduler business logic.
+ */
type ScheduleBl struct {
structure.MutexLocker
- dispatchLocker structure.MutexLocker
- spaceQueue *schedules.SpaceQueue
- broker *schedules.Broker
- startChan chan *structure.TaskInfo
- isDispatchPaused bool
+ // resource management
+ resourceManager *schedules.SchedulerResourceManager
+ // algorithm management
+ algorithmManager *schedules.SchedulerAlgorithmManager
+ // task management
+ taskManager *schedules.SchedulerTaskManager
+ // cron management
+ cronManager *schedules.SchedulerCronManager
+ // start channel for tasks to be started
+ startChan chan *structure.TaskInfo
+ // configurations
+ startChanSize int
+ tickerInterval int
+ softSchedule bool
}
+/*
+* @Description: Init initializes the ScheduleBl.
+* @Note: This function will initialize the ScheduleBl.
+ */
func (s *ScheduleBl) Init() {
+ logrus.Info("Initializing ScheduleBl...")
+ s.LoadConfig()
+ startChan := make(chan *structure.TaskInfo, s.startChanSize)
+ s.startChan = startChan
+
+ s.resourceManager = &schedules.SchedulerResourceManager{}
+ s.resourceManager.Init()
+ s.taskManager = &schedules.SchedulerTaskManager{}
+ s.taskManager.Init()
+ s.algorithmManager = &schedules.SchedulerAlgorithmManager{}
+ s.algorithmManager.Init()
+ s.cronManager = &schedules.SchedulerCronManager{}
+ s.cronManager.Init(s.QueueTaskFromTemplate)
+ go s.startTicker()
+ go s.waitingStartedTask()
+}
+
+/*
+* @Description: LoadConfig loads the configuration from the common package.
+* @Note: This function will load the configuration from the common package.
+ */
+func (s *ScheduleBl) LoadConfig() {
+ // Load configuration from common package
+
+ // startChanSize
const defaultChanSizeConfig = "10"
chanSize := common.GetConfigDefault("start_chan_size",
defaultChanSizeConfig).(string)
// Convert string to int
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
logrus.Infof("using default start_chan_size: %s",
defaultChanSizeConfig)
chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
}
- startChan := make(chan *structure.TaskInfo, chanSizeInt)
- s.startChan = startChan
- s.spaceQueue = (&schedules.SpaceQueue{}).Init()
- s.broker = (&schedules.Broker{}).Init()
+ s.startChanSize = chanSizeInt
- go s.waitingTask()
- go s.startTicker()
+ // tickerInterval
+ const defaultTickerInterval = "3"
+ tickerInterval := common.GetConfigDefault("ticker_interval",
defaultTickerInterval).(string)
+ tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+ if err != nil {
+ logrus.Errorf("failed to convert ticker_interval to int: %v",
err)
+ logrus.Infof("using default ticker_interval: %s",
defaultTickerInterval)
+ tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+ }
+ s.tickerInterval = tickerIntervalInt
+
+ // softSchedule
+ softSchedule := common.GetConfigDefault("soft_schedule",
"true").(string)
+ if softSchedule == "true" {
+ s.softSchedule = true
+ } else {
+ s.softSchedule = false
+ }
+
+ logrus.Infof("ScheduleBl configuration: startChanSize=%d,
tickerInterval=%d, softSchedule=%v",
+ s.startChanSize, s.tickerInterval, s.softSchedule)
}
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
- return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+ // Create a ticker with the specified interval
+ ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+ for range ticker {
+ logrus.Debug("Ticker ticked")
+ s.TryScheduleNextTasks()
+ }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+ defer func() {
+ if err := recover(); err != nil {
+ logrus.Errorln("TryScheduleNextTasks() has been
recovered:", err)
+ }
+ }()
+
+ if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+ logrus.Errorf("do scheduling error:%v", err)
+ }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error
{
+ // Implement logic to get the next task in the queue for the given space
+ if !(len(noLock) > 0 && noLock[0]) {
+ defer s.Unlock(s.Lock())
+ }
+
+ // step 1: make sure all tasks have alloc to a worker group
+ // This is done by the TaskManager, which assigns a worker group to
each task
+ s.taskManager.RefreshTaskToWorkerGroupMap()
+
+ // step 2: get available resources and tasks
+ logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+ idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+ concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+ allTasks := s.taskManager.GetAllTasksNotComplete()
+ if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 &&
len(concurrentWorkerGroups) == 0) {
+ logrus.Debugf("no available tasks or workerGroups, allTasks:
%d, workerGroups: %d/%d",
+ len(allTasks), len(idleWorkerGroups),
len(concurrentWorkerGroups))
+ return nil
+ }
+ logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks),
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
+ // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
+ // NOT only by user setting, but also by scheduler setting
+
+ // step 3: return the task with the highest priority or small tasks
which can be executed immediately
+ taskToWorkerGroupMap := s.taskManager.GetTaskToWorkerGroupMap()
+ nextTasks, err := s.algorithmManager.ScheduleNextTasks(allTasks,
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+ if err != nil {
+ logrus.Errorf("failed to schedule next tasks: %v", err)
+ return err
+ }
+ logrus.Debugf("scheduled %d tasks", len(nextTasks))
+ // step 4: send to start channel
+ for _, task := range nextTasks {
+ if task == nil {
+ logrus.Warnf("received a nil task from algorithm
manager")
+ continue
+ }
+ if task.State != structure.TaskStateWaiting {
+ logrus.Warnf("task '%d' is not in waiting state,
current state: %s", task.ID, task.State)
+ continue
+ }
+ logrus.Infof("scheduling task '%d' with type '%s' to start
channel", task.ID, task.Type)
+ select {
+ case s.startChan <- task:
+ logrus.Infof("task '%d' sent to start channel", task.ID)
+ default:
+ errMsg := fmt.Sprintf("start channel is full, cannot
schedule task %d", task.ID)
+ logrus.Errorf(errMsg)
+ taskMgr.SetError(task, errMsg)
+ }
+ }
+
+ return nil
}
// QueueTask Add the task to the inner queue.
-// The tasks will be executed in order from the queue.
// If the task exists, return false.
+/*
+* @Description: QueueTask queues the task.
+* @Note: This function will queue the task.
+* @Param taskInfo
+* @Return bool, error
+ */
func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
if taskInfo == nil {
return false, errors.New("the argument `taskInfo` is nil")
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo
*structure.TaskInfo) (bool, error) {
return false, errors.New("the property `SpaceName` of taskInfo
is empty")
}
- //defer s.Unlock(s.Lock())
+ defer s.Unlock(s.Lock())
if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err
!= nil {
return false, err
}
+ logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID,
taskInfo)
+
+ // check dependency if exists
+ if len(taskInfo.Preorders) > 0 {
+ for _, depTaskID := range taskInfo.Preorders {
+ depTask := taskMgr.GetTaskByID(depTaskID)
+ if depTask == nil {
+ err := errors.New("the dependency task with ID
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+ logrus.Error(err)
+ taskMgr.SetError(taskInfo, err.Error())
+ return false, err
+ }
+ }
+ }
+
// Notice: Ensure successful invocation.
- ok, err := s.spaceQueue.PushTask(taskInfo)
+ // make sure all tasks have alloc to a worker group
+ ok, err := s.taskManager.QueueTask(taskInfo)
if err != nil {
taskMgr.SetError(taskInfo, err.Error())
return ok, err
}
- go s.dispatch()
+ if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+ if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+ logrus.Errorf("failed to add cron task: %v", err)
+ return false, err
+ }
+ logrus.Infof("added cron task for task '%d' with expression
'%s'", taskInfo.ID, taskInfo.CronExpr)
+ }
return ok, nil
}
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
- if taskInfo == nil {
- return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo)
(int32, error) {
+ if template == nil {
+ return -1, errors.New("the argument `template` is nil")
}
- s.Lock()
- isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
- task := s.spaceQueue.RemoveTask(taskInfo.ID)
- s.Unlock(nil)
+ bc := &baseCreator{}
+ taskInfo, err := bc.CopyTaskInfo(template)
+ if err != nil {
+ logrus.Errorf("failed to copy task info from template, template
ID: %d, caused by: %v", template.ID, err)
+ return -1, err
+ }
+ bc.saveTaskInfo(taskInfo)
- isInQueue := false
- if task != nil {
- logrus.Infof("removed task '%d' from space queue", task.ID)
- isInQueue = true
+ ok, err := s.QueueTask(taskInfo)
+ if err != nil || !ok {
+ logrus.Errorf("failed to queue task from template, template ID:
%d, caused by: %v", template.ID, err)
+ return -1, err
}
- if isInQueue && !isHeadTask {
- if err := taskMgr.SetState(taskInfo,
structure.TaskStateCanceled); err != nil {
- return err
- }
+ logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID,
template.ID)
- logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
- } else {
- logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
- return s.handleCancelTask(taskInfo)
+ return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool,
[]error) {
+ if len(taskInfos) == 0 {
+ return []bool{}, []error{}
}
- return nil
-}
+ s.PauseDispatch()
-func (s *ScheduleBl) IsDispatchPaused() bool {
- return s.isDispatchPaused
-}
-func (s *ScheduleBl) PauseDispatch() {
- s.isDispatchPaused = true
-}
+ defer s.ResumeDispatch()
+ // defer s.Unlock(s.Lock())
-func (s *ScheduleBl) ResumeDispatch() {
- s.isDispatchPaused = false
+ errors := make([]error, len(taskInfos))
+ oks := make([]bool, len(taskInfos))
+
+ for _, taskInfo := range taskInfos {
+ ok, err := s.QueueTask(taskInfo)
+ if err != nil {
+ logrus.Errorf("failed to queue task '%d': %v",
taskInfo.ID, err)
+ }
+ errors = append(errors, err)
+ oks = append(oks, ok)
+ }
+
+ return oks, errors
}
-func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
- return s.spaceQueue.AllTasks()
+// ******** CloseCurrent ********
+func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string)
error {
+ defer s.Unlock(s.Lock())
+
+ // trace tasks need these workers, check if these tasks are available
+ s.taskManager.RemoveTask(taskId)
+ // release the worker group
+ s.resourceManager.ReleaseByTaskID(taskId)
+
+ if len(removeWorkerName) > 0 {
+ // stop the cron job if exists when need remove worker,
otherwise the task is just closed normally
+ s.cronManager.DeleteTask(taskId)
+ // remove the worker from resource manager
+ workerName := removeWorkerName[0]
+ if workerName == "" {
+ return errors.New("the argument `removeWorkerName` is
empty")
+ }
+ logrus.Infof("removing worker '%s' from resource manager",
workerName)
+ s.ChangeWorkerStatus(workerName,
schedules.WorkerOngoingStatusDeleted)
+ }
+
+ logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
+ s.TryScheduleNextTasks(true)
+ return nil
}
-func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
- return s.spaceQueue.SpaceTasks(space)
+// This will be called when a worker is offline.
+// This will be called when a worker is online.
+func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status
schedules.WorkerOngoingStatus) (bool, error) {
+ defer s.Unlock(s.Lock())
+ s.resourceManager.ChangeWorkerStatus(workerName, status)
+
+ logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
+ // After changing the worker status, we may need to reschedule tasks
+ s.TryScheduleNextTasks(true)
+
+ return true, nil
}
-func (s *ScheduleBl) CloseCurrent(taskId int32) error {
- logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
- s.dispatch()
+// ******** START TASK ********
+func (s *ScheduleBl) waitingStartedTask() {
+ for taskInfo := range s.startChan {
+ if taskInfo == nil {
+ logrus.Warnf("received a nil task from startChan")
+ continue
+ }
- return nil
+ logrus.Infof("chan received task '%d' to start", taskInfo.ID)
+ s.handleStartTask(taskInfo)
+ }
}
+// now, start task!
func (s *ScheduleBl) handleStartTask(taskInfo *structure.TaskInfo) {
- agent, status, err := s.broker.ApplyAgent(taskInfo)
+ agent, status, err := s.resourceManager.GetAgentAndAssignTask(taskInfo)
if err != nil {
logrus.Errorf("apply agent error: %v", err)
@@ -175,24 +411,6 @@ func (s *ScheduleBl) handleStartTask(taskInfo
*structure.TaskInfo) {
go s.startWaitingTask(agent, taskInfo)
}
-func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
- logrus.Infof("received task '%d' to cancel", taskInfo.ID)
- canceler, err := NewTaskCanceler(taskInfo)
- if err != nil {
- logrus.Errorf("failed to create new TaskCanceler err: %v", err)
- taskMgr.SetError(taskInfo, err.Error())
- return err
- }
-
- if err := canceler.CancelTask(); err != nil {
- logrus.Errorf("failed to cancel task '%d', caused by: %v",
taskInfo.ID, err)
- taskMgr.SetError(taskInfo, err.Error())
- return err
- }
-
- return nil
-}
-
func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo
*structure.TaskInfo) {
logrus.Infof("starting a task, id: %v, type: %v, graph: %v",
taskInfo.ID, taskInfo.Type, taskInfo.GraphName)
@@ -202,6 +420,7 @@ func (s *ScheduleBl) startWaitingTask(agent
*schedules.Agent, taskInfo *structur
}
}()
+ // TODO: Is here need a lock? TOCTTOU
if taskInfo.State != structure.TaskStateWaiting {
logrus.Errorf("task state is not in 'Waiting' state, taskID:
%v", taskInfo)
return
@@ -222,68 +441,115 @@ func (s *ScheduleBl) startWaitingTask(agent
*schedules.Agent, taskInfo *structur
taskInfo.StartTime = time.Now()
err = taskStarter.StartTask()
+
+ // only for test or debug, record the task start sequence
+ if err := s.taskManager.AddTaskStartSequence(taskInfo.ID); err != nil {
+ logrus.Errorf("failed to add task '%d' to start sequence: %v",
taskInfo.ID, err)
+ }
+
if err != nil {
logrus.Errorf("failed to start a task, type: %s, taskID: %d,
caused by: %v", taskInfo.Type, taskInfo.ID, err)
taskMgr.SetError(taskInfo, err.Error())
}
-
}
-func (s *ScheduleBl) dispatch() {
- defer func() {
- if err := recover(); err != nil {
- logrus.Errorln("dispatch() has been recovered:", err)
+// ********* CANCEL TASK ********
+// handle cancel task
+// need to cancel cron task
+func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
+ if taskInfo == nil {
+ return errors.New("the argument `taskInfo` is nil")
+ }
+
+ defer s.Unlock(s.Lock())
+
+ isHeadTask := s.taskManager.IsTaskOngoing(taskInfo.ID)
+ task := s.taskManager.RemoveTask(taskInfo.ID)
+ s.cronManager.DeleteTask(taskInfo.ID)
+ // err := s.taskManager.CancelTask(taskInfo)
+ isInQueue := false
+ if task != nil {
+ logrus.Infof("removed task '%d' from space queue", taskInfo.ID)
+ isInQueue = true
+ }
+
+ if isInQueue && !isHeadTask {
+ if err := taskMgr.SetState(taskInfo,
structure.TaskStateCanceled); err != nil {
+ return err
}
- }()
- if err := s.doDispatch(); err != nil {
- logrus.Errorf("do dispatching error:%v", err)
+ logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
+ } else {
+ logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
+ return s.handleCancelTask(taskInfo)
}
+
+ return nil
}
-func (s *ScheduleBl) doDispatch() error {
- if s.isDispatchPaused {
- logrus.Warn("the dispatching was paused")
- return nil
+func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
+ logrus.Infof("received task '%d' to cancel", taskInfo.ID)
+ canceler, err := NewTaskCanceler(taskInfo)
+ if err != nil {
+ logrus.Errorf("failed to create new TaskCanceler err: %v", err)
+ taskMgr.SetError(taskInfo, err.Error())
+ return err
}
- defer s.dispatchLocker.Unlock(s.dispatchLocker.Lock())
-
- buffer := s.spaceQueue.HeadTasks()
- if len(buffer) == 0 {
- return nil
+ if err := canceler.CancelTask(); err != nil {
+ logrus.Errorf("failed to cancel task '%d', caused by: %v",
taskInfo.ID, err)
+ taskMgr.SetError(taskInfo, err.Error())
+ return err
}
- for _, task := range buffer {
- select {
- case s.startChan <- task:
- default:
- logrus.Warnf("the start channel is full, dropped task:
%d", task.ID)
- }
+ // set worker state to idle or concurrent running
+ s.resourceManager.ReleaseByTaskID(taskInfo.ID)
+
+ return nil
+}
+func (s *ScheduleBl) CancelCronTask(taskInfo *structure.TaskInfo) error {
+ if taskInfo == nil {
+ return errors.New("the argument `taskInfo` is nil")
}
+ s.cronManager.DeleteTask(taskInfo.ID)
+
return nil
}
-func (s *ScheduleBl) waitingTask() {
- for taskInfo := range s.startChan {
- if taskInfo == nil {
- logrus.Warnf("recieved a nil task from startChan")
- return
- }
+// ** Other Methods **
- logrus.Infof("chan received task '%d' to start", taskInfo.ID)
- s.handleStartTask(taskInfo)
- }
+func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
+ return s.taskManager.GetLastTask(space)
}
-func (s *ScheduleBl) startTicker() {
- // Create a ticker that triggers every 3 seconds
- ticker := time.Tick(3 * time.Second)
+func (s *ScheduleBl) IsDispatchPaused() bool {
+ // Implement logic to check if dispatching is paused
+ return s.algorithmManager.IsDispatchPaused()
+}
- for range ticker {
- //logrus.Debug("Ticker ticked")
- s.dispatch()
- }
+func (s *ScheduleBl) PauseDispatch() {
+ // Implement logic to pause dispatching
+ s.algorithmManager.PauseDispatch()
+}
+
+func (s *ScheduleBl) ResumeDispatch() {
+ // Implement logic to resume dispatching
+ s.algorithmManager.ResumeDispatch()
+}
+
+func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
+ // Implement logic to get all tasks in the queue
+ return s.taskManager.GetAllTasks()
+}
+
+func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
+ // Implement logic to get tasks in the queue for a specific space
+ return s.taskManager.GetTasksInQueue(space)
+}
+
+func (s *ScheduleBl) TaskStartSequence(queryTasks []int32)
[]*structure.TaskInfo {
+ // Only for debug or test, get task start sequence
+ return s.taskManager.GetTaskStartSequence(queryTasks)
}
diff --git a/vermeer/apps/master/bl/task_bl.go
b/vermeer/apps/master/bl/task_bl.go
index fa3d5b58..1c7e43c9 100644
--- a/vermeer/apps/master/bl/task_bl.go
+++ b/vermeer/apps/master/bl/task_bl.go
@@ -20,7 +20,10 @@ package bl
import (
"errors"
"fmt"
+ "math"
"sort"
+ "strconv"
+ "strings"
"time"
"vermeer/apps/compute"
@@ -62,6 +65,56 @@ func (tb *TaskBl) CreateTaskInfo(
return nil, err
}
+ // for scheduler
+ taskInfo.Priority = 0
+ taskInfo.Preorders = make([]int32, 0)
+ taskInfo.Exclusive = true // default to true for now, can be set false
by params
+ if params != nil {
+ if priority, ok := params["priority"]; ok {
+ if p, err := strconv.ParseInt(priority, 10, 32); err ==
nil {
+ if p < 0 {
+ return nil, fmt.Errorf("priority should
be non-negative")
+ }
+ if p > math.MaxInt32 {
+ return nil, fmt.Errorf("priority
exceeds maximum value: %d", math.MaxInt32)
+ }
+ taskInfo.Priority = int32(p)
+ } else {
+ logrus.Warnf("priority convert to int32
error:%v", err)
+ return nil, err
+ }
+ }
+ if preorders, ok := params["preorders"]; ok {
+ preorderList := strings.Split(preorders, ",")
+ for _, preorder := range preorderList {
+ if pid, err := strconv.ParseInt(preorder, 10,
32); err == nil {
+ if taskMgr.GetTaskByID(int32(pid)) ==
nil {
+ return nil,
fmt.Errorf("preorder task with ID %d does not exist", pid)
+ }
+ taskInfo.Preorders =
append(taskInfo.Preorders, int32(pid))
+ } else {
+ logrus.Warnf("preorder convert to int32
error:%v", err)
+ return nil, err
+ }
+ }
+ }
+ if exclusive, ok := params["exclusive"]; ok {
+ if ex, err := strconv.ParseBool(exclusive); err == nil {
+ taskInfo.Exclusive = ex
+ } else {
+ logrus.Warnf("exclusive convert to bool
error:%v", err)
+ return nil, err
+ }
+ }
+ if cronExpr, ok := params["cron_expr"]; ok {
+ if err :=
Scheduler.cronManager.CheckCronExpression(cronExpr); err != nil {
+ logrus.Warnf("cron_expr parse error:%v", err)
+ return nil, err
+ }
+ taskInfo.CronExpr = cronExpr
+ }
+ }
+
return taskInfo, nil
}
@@ -146,6 +199,9 @@ func (tb *TaskBl) CancelTask(taskID int32) error {
return fmt.Errorf("cannot cancel the task with id '%v' as it
was not created by you", taskID)
}
+ // stop the cron job if exists
+ Scheduler.CancelCronTask(task)
+
if task.State == structure.TaskStateCanceled {
return fmt.Errorf("task had been in state canceled")
}
@@ -206,13 +262,10 @@ func QueueExecuteTask(taskInfo *structure.TaskInfo) error
{
}
func QueueExecuteTasks(tasksInfo []*structure.TaskInfo) []error {
- defer Scheduler.Unlock(Scheduler.Lock())
- errs := make([]error, 0, len(tasksInfo))
for _, task := range tasksInfo {
task.CreateType = structure.TaskCreateAsync
- _, err := Scheduler.QueueTask(task)
- errs = append(errs, err)
}
+ _, errs := Scheduler.BatchQueueTask(tasksInfo)
return errs
}
diff --git a/vermeer/apps/master/bl/task_creator.go
b/vermeer/apps/master/bl/task_creator.go
index ca3f946c..a7353c85 100644
--- a/vermeer/apps/master/bl/task_creator.go
+++ b/vermeer/apps/master/bl/task_creator.go
@@ -99,6 +99,28 @@ func (bc *baseCreator) NewTaskInfo(graphName string, params
map[string]string, t
return task, nil
}
+func (bc *baseCreator) CopyTaskInfo(src *structure.TaskInfo)
(*structure.TaskInfo, error) {
+ if src == nil {
+ return nil, fmt.Errorf("the argument `src` should not be nil")
+ }
+
+ task, err := taskMgr.CreateTask(src.SpaceName, src.Type, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ task.CreateType = structure.TaskCreateAsync
+ task.GraphName = src.GraphName
+ task.CreateUser = src.CreateUser
+ task.Params = src.Params
+ task.CronExpr = "" // clear cron expression for the new task
+ task.Priority = src.Priority
+ task.Preorders = src.Preorders
+ task.Exclusive = src.Exclusive
+
+ return task, nil
+}
+
func (bc *baseCreator) saveTaskInfo(task *structure.TaskInfo)
(*structure.TaskInfo, error) {
if _, err := taskMgr.AddTask(task); err != nil {
logrus.Errorf("failed to add a task to `TaskManager`, task: %v,
cased by: %v", task, err)
diff --git a/vermeer/apps/master/bl/worker_bl.go
b/vermeer/apps/master/bl/worker_bl.go
index e14d20c4..651e1e04 100644
--- a/vermeer/apps/master/bl/worker_bl.go
+++ b/vermeer/apps/master/bl/worker_bl.go
@@ -70,7 +70,7 @@ func (wb *WorkerBl) ReleaseWorker(workerName string) error {
//taskInfo.SetErrMsg(fmt.Sprintf("worker %v is
offline", workerName))
taskMgr.SetError(taskInfo, fmt.Sprintf("worker
%v is offline", workerName))
logrus.Warnf("set task %v status:error",
taskInfo.ID)
- if err := Scheduler.CloseCurrent(taskInfo.ID);
err != nil {
+ if err := Scheduler.CloseCurrent(taskInfo.ID,
workerName); err != nil {
logrus.Errorf("failed to close task
with ID: %d,err:%v", taskInfo.ID, err)
}
break
diff --git a/vermeer/apps/master/master_main.go
b/vermeer/apps/master/master_main.go
index 2da20a2f..91cd3318 100644
--- a/vermeer/apps/master/master_main.go
+++ b/vermeer/apps/master/master_main.go
@@ -56,6 +56,7 @@ func Main() {
services.SetUI(sen)
logrus.Info("token-auth was activated")
default:
+ services.SetAdminRouters(sen, auth.NoneAuthFilter)
services.SetRouters(sen, auth.NoneAuthFilter)
logrus.Warn("No authentication was activated.")
}
diff --git a/vermeer/apps/master/schedules/broker.go
b/vermeer/apps/master/schedules/broker.go
index fabdf415..435219dc 100644
--- a/vermeer/apps/master/schedules/broker.go
+++ b/vermeer/apps/master/schedules/broker.go
@@ -23,7 +23,7 @@ import (
"github.com/sirupsen/logrus"
- . "vermeer/apps/master/workers"
+ "vermeer/apps/master/workers"
)
type AgentStatus string
@@ -72,42 +72,44 @@ func (b *Broker) AllAgents() []*Agent {
return res
}
-func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo) (*Agent,
AgentStatus, error) {
+func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo, forceApply ...bool)
(*Agent, AgentStatus, map[string]*workers.WorkerClient, error) {
if taskInfo == nil {
- return nil, AgentStatusError, fmt.Errorf("taskInfo is nil")
+ return nil, AgentStatusError, nil, fmt.Errorf("taskInfo is nil")
}
defer b.Unlock(b.Lock())
agent, workers, err := b.getAgent(taskInfo)
if err != nil {
- return nil, AgentStatusError, err
+ return nil, AgentStatusError, nil, err
}
if agent == nil {
- return nil, AgentStatusPending, nil
+ return nil, AgentStatusPending, nil, nil
}
if workers == nil || len(workers) == 0 {
- return nil, AgentStatusNoWorker, nil
+ return nil, AgentStatusNoWorker, nil, nil
}
if !b.isWorkersReady(workers) {
logrus.Warnf("the workers of agent '%s' are not ready",
agent.GroupName())
- return nil, AgentStatusWorkerNotReady, nil
+ return nil, AgentStatusWorkerNotReady, nil, nil
}
- if b.isAgentBusy(agent) {
- return nil, AgentStatusAgentBusy, nil
- }
+ if !(forceApply != nil && len(forceApply) > 0 && forceApply[0]) {
+ if b.isAgentBusy(agent) {
+ return nil, AgentStatusAgentBusy, nil, nil
+ }
- if b.isWorkerBusy(workers, agent) {
- return nil, AgentStatusWorkerBusy, nil
+ if b.isWorkerBusy(workers, agent) {
+ return nil, AgentStatusWorkerBusy, nil, nil
+ }
}
agent.AssignTask(taskInfo)
- return agent, AgentStatusOk, nil
+ return agent, AgentStatusOk, workers, nil
}
// func (b *Broker) isAgentReady(taskInfo *structure.TaskInfo, agent *Agent)
bool {
@@ -124,7 +126,7 @@ func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo)
(*Agent, AgentStatus,
// }
// }
-func (b *Broker) isWorkersReady(workers map[string]*WorkerClient) bool {
+func (b *Broker) isWorkersReady(workers map[string]*workers.WorkerClient) bool
{
ok := false
for _, w := range workers {
if w.Connection == nil {
@@ -166,7 +168,7 @@ func (b *Broker) isAgentBusy(agent *Agent) bool {
return busy
}
-func (b *Broker) isWorkerBusy(workers map[string]*WorkerClient, agent *Agent)
bool {
+func (b *Broker) isWorkerBusy(workers map[string]*workers.WorkerClient, agent
*Agent) bool {
for _, a := range b.agents {
if a == agent {
continue
@@ -188,7 +190,7 @@ func (b *Broker) isWorkerBusy(workers
map[string]*WorkerClient, agent *Agent) bo
return false
}
-func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent,
map[string]*WorkerClient, error) {
+func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent,
map[string]*workers.WorkerClient, error) {
switch taskInfo.Type {
case structure.TaskTypeLoad:
fallthrough
@@ -202,7 +204,7 @@ func (b *Broker) getAgent(taskInfo *structure.TaskInfo)
(*Agent, map[string]*Wor
}
-func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent,
map[string]*WorkerClient, error) {
+func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent,
map[string]*workers.WorkerClient, error) {
graph := graphMgr.GetGraphByName(taskInfo.SpaceName, taskInfo.GraphName)
if graph == nil {
return nil, nil, fmt.Errorf("failed to retrieve graph with
name: %s/%s", taskInfo.SpaceName, taskInfo.GraphName)
@@ -223,7 +225,7 @@ func (b *Broker) getAgentFromGraph(taskInfo
*structure.TaskInfo) (*Agent, map[st
return nil, nil, nil // waiting for the next check
}
- workers := make(map[string]*WorkerClient)
+ workers := make(map[string]*workers.WorkerClient)
for _, w := range graph.Workers {
wc := workerMgr.GetWorker(w.Name)
@@ -238,7 +240,7 @@ func (b *Broker) getAgentFromGraph(taskInfo
*structure.TaskInfo) (*Agent, map[st
}
-func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent,
map[string]*WorkerClient, error) {
+func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent,
map[string]*workers.WorkerClient, error) {
group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
return b.retrieveAgent(group), workerMgr.GroupWorkerMap(group), nil
}
diff --git a/vermeer/apps/master/schedules/scheduler_algorithm_manager.go
b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go
new file mode 100644
index 00000000..c65e9f4d
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go
@@ -0,0 +1,524 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+ "slices"
+ "sort"
+ "strconv"
+ "time"
+ "vermeer/apps/common"
+ "vermeer/apps/structure"
+
+ "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerAlgorithm is the interface for the scheduler
algorithm.
+* @Note: This is the interface for the scheduler algorithm.
+ */
+type SchedulerAlgorithm interface {
+ // Name returns the name of the SchedulerAlgorithm
+ Name() string
+ // Init initializes the SchedulerAlgorithm
+ Init()
+ // FilterNextTasks filters the next tasks to be scheduled based on the
provided parameters
+ FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap
map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string,
softSchedule bool) ([]*structure.TaskInfo, error)
+ // ScheduleNextTasks schedules the next tasks based on the filtered
tasks
+ ScheduleNextTasks(filteredTasks []*structure.TaskInfo,
taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string,
concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo,
error)
+}
+
+/*
+* @Description: SchedulerAlgorithmManager is the manager for the scheduler
algorithm.
+* @Note: This is the manager for the scheduler algorithm.
+ */
+type SchedulerAlgorithmManager struct {
+ filteredSchedulerAlgorithms map[string]SchedulerAlgorithm
+ scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm
+ dispatchPaused bool
+}
+
+/*
+* @Description: Init initializes the SchedulerAlgorithmManager.
+* @Note: This function will initialize the SchedulerAlgorithmManager.
+ */
+// Need to put DependsSchedulerAlgorithm before WaitingSchedulerAlgorithm
+func (am *SchedulerAlgorithmManager) Init() {
+ am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+ am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+ am.dispatchPaused = false
+ // Register filter and schedule algorithms
+ am.RegisterFilterAlgorithm(&DependsSchedulerAlgorithm{})
+ am.RegisterFilterAlgorithm(&WaitingSchedulerAlgorithm{})
+ // Register default SchedulerAlgorithms
+ am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{})
+}
+
+/*
+* @Description: RegisterSchedulerAlgorithm registers the scheduler algorithm.
+* @Note: This function will register the scheduler algorithm.
+* @Param schedulerAlgorithm
+ */
+func (am *SchedulerAlgorithmManager)
RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) {
+ if schedulerAlgorithm == nil {
+ return
+ }
+ name := schedulerAlgorithm.Name()
+ if _, exists := am.scheduledSchedulerAlgorithms[name]; exists {
+ return // SchedulerAlgorithm already registered
+ }
+
+ // only support one scheduling algorithm for now
+ if len(am.scheduledSchedulerAlgorithms) > 0 {
+ return // Only one scheduling algorithm can be registered
+ }
+ schedulerAlgorithm.Init()
+ am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm
+}
+
+/*
+* @Description: RegisterFilterAlgorithm registers the filter algorithm.
+* @Note: This function will register the filter algorithm.
+* @Param filterAlgorithm
+ */
+func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm
SchedulerAlgorithm) {
+ if filterAlgorithm == nil {
+ return
+ }
+ name := filterAlgorithm.Name()
+ if _, exists := am.filteredSchedulerAlgorithms[name]; exists {
+ return // SchedulerAlgorithm already registered
+ }
+ filterAlgorithm.Init()
+ am.filteredSchedulerAlgorithms[name] = filterAlgorithm
+}
+
+/*
+* @Description: IsDispatchPaused checks if the dispatch is paused.
+* @Note: This function will check if the dispatch is paused.
+* @Return bool
+ */
+func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool {
+ return am.dispatchPaused
+}
+
+/*
+* @Description: PauseDispatch pauses the dispatch.
+* @Note: This function will pause the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) PauseDispatch() {
+ am.dispatchPaused = true
+}
+
+/*
+* @Description: ResumeDispatch resumes the dispatch.
+* @Note: This function will resume the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) ResumeDispatch() {
+ am.dispatchPaused = false
+}
+
+/*
+* @Description: ScheduleNextTasks schedules the next tasks.
+* @Note: This function will schedule the next tasks.
+* @Param allTasks
+* @Param taskToWorkerGroupMap
+* @Param idleWorkerGroups
+* @Param concurrentWorkerGroups
+* @Param softSchedule
+* @Return []*structure.TaskInfo, error
+ */
+// For all tasks, filter and schedule them
+// Only one scheduling algorithm is supported for now
+func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if am.dispatchPaused {
+ return nil, nil // No tasks to schedule if dispatch is paused
+ }
+
+ filteredTasks := allTasks
+ for _, algorithm := range am.filteredSchedulerAlgorithms {
+ var err error
+ filteredTasks, err = algorithm.FilterNextTasks(filteredTasks,
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+ if err != nil {
+ return nil, err
+ }
+ }
+ if len(filteredTasks) == 0 {
+ return nil, nil // No tasks to schedule after filtering
+ }
+
+ // only support one scheduling algorithm for now
+ // get first algorithm
+ for _, algorithm := range am.scheduledSchedulerAlgorithms {
+ tasks, err := algorithm.ScheduleNextTasks(filteredTasks,
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+ if err != nil {
+ return nil, err
+ }
+ return tasks, nil // Return the scheduled tasks
+ }
+
+ return nil, nil // No tasks scheduled
+}
+
+type FIFOSchedulerAlgorithm struct{}
+
+func (f *FIFOSchedulerAlgorithm) Name() string {
+ return "FIFO"
+}
+
+func (f *FIFOSchedulerAlgorithm) Init() {
+ // No specific initialization needed for FIFO
+ logrus.Info("Initializing FIFOSchedulerAlgorithm")
+}
+
+func (f *FIFOSchedulerAlgorithm) FilterNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ // just return the waiting tasks as is for FIFO
+ return allTasks, nil
+}
+
+func (f *FIFOSchedulerAlgorithm) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if len(allTasks) == 0 {
+ return nil, nil // No tasks to schedule
+ }
+
+ // For FIFO, we simply return the available tasks in the order they are
provided
+ for _, task := range allTasks {
+ if task.State != structure.TaskStateWaiting {
+ continue // Only consider tasks that are in the waiting
state
+ }
+ if group, exists := taskToWorkerGroupMap[task.ID]; exists &&
group != "" {
+ // only support idle worker groups for now
+ for _, idleGroup := range idleWorkerGroups {
+ if group == idleGroup {
+ logrus.Debugf("Task %d is assigned to
worker group %s", task.ID, group)
+ return []*structure.TaskInfo{task}, nil
// Return the first task that can be scheduled
+ }
+ }
+ }
+ }
+
+ return nil, nil
+}
+
+type PrioritySchedulerAlgorithm struct{}
+
+func (p *PrioritySchedulerAlgorithm) Name() string {
+ return "Priority"
+}
+
+func (p *PrioritySchedulerAlgorithm) Init() {
+ // No specific initialization needed for Priority
+ logrus.Info("Initializing PrioritySchedulerAlgorithm")
+}
+
+func (p *PrioritySchedulerAlgorithm) FilterNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ // just return the waiting tasks as is for Priority
+ return allTasks, nil
+}
+
+func (p *PrioritySchedulerAlgorithm) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if len(allTasks) == 0 {
+ return nil, nil // No tasks to schedule
+ }
+
+ // Sort tasks by priority (higher priority first)
+ sort.Slice(allTasks, func(i, j int) bool {
+ return allTasks[i].Priority > allTasks[j].Priority
+ })
+
+ for _, task := range allTasks {
+ if task.State != structure.TaskStateWaiting {
+ continue // Only consider tasks that are in the waiting
state
+ }
+ if group, exists := taskToWorkerGroupMap[task.ID]; exists &&
group != "" {
+ // only support idle worker groups for now
+ for _, idleGroup := range idleWorkerGroups {
+ if group == idleGroup {
+ logrus.Debugf("Task %d is assigned to
worker group %s", task.ID, group)
+ return []*structure.TaskInfo{task}, nil
// Return the first task that can be scheduled
+ }
+ }
+ }
+ }
+
+ return nil, nil
+}
+
+type PriorityElderSchedulerAlgorithm struct {
+ ageParam int64
+ priorityParam int64
+ resourceParam int64
+ randomValueParam int64
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Name() string {
+ return "PriorityElder"
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Init() {
+ logrus.Info("Initializing PriorityElderSchedulerAlgorithm")
+
+ // Initialize parameters with default values
+ defaultAgeParam := "1"
+ defaultPriorityParam := "1"
+ defaultResourceParam := "10000000000"
+ defaultRandomValueParam := "1" // Placeholder for any random value logic
+
+ // Load parameters from configuration
+ ageParam := common.GetConfigDefault("priority_elder_age_param",
defaultAgeParam).(string)
+ priorityParam :=
common.GetConfigDefault("priority_elder_priority_param",
defaultPriorityParam).(string)
+ resourceParam :=
common.GetConfigDefault("priority_elder_resource_param",
defaultResourceParam).(string)
+ randomValueParam :=
common.GetConfigDefault("priority_elder_random_value_param",
defaultRandomValueParam).(string)
+
+ ageParamInt, err := strconv.Atoi(ageParam)
+ if err != nil {
+ logrus.Errorf("failed to convert priority_elder_age_param to
int: %v", err)
+ logrus.Infof("using default priority_elder_age_param: %s",
defaultAgeParam)
+ ageParamInt, _ = strconv.Atoi(defaultAgeParam)
+ }
+ p.ageParam = int64(ageParamInt)
+ priorityParamInt, err := strconv.Atoi(priorityParam)
+ if err != nil {
+ logrus.Errorf("failed to convert priority_elder_priority_param
to int: %v", err)
+ logrus.Infof("using default priority_elder_priority_param: %s",
defaultPriorityParam)
+ priorityParamInt, _ = strconv.Atoi(defaultPriorityParam)
+ }
+ p.priorityParam = int64(priorityParamInt)
+ resourceParamInt, err := strconv.Atoi(resourceParam)
+ if err != nil {
+ logrus.Errorf("failed to convert priority_elder_resource_param
to int: %v", err)
+ logrus.Infof("using default priority_elder_resource_param: %s",
defaultResourceParam)
+ resourceParamInt, _ = strconv.Atoi(defaultResourceParam)
+ }
+ p.resourceParam = int64(resourceParamInt)
+ randomValueParamInt, err := strconv.Atoi(randomValueParam)
+ if err != nil {
+ logrus.Errorf("failed to convert
priority_elder_random_value_param to int: %v", err)
+ logrus.Infof("using default priority_elder_random_value_param:
%s", defaultRandomValueParam)
+ randomValueParamInt, _ = strconv.Atoi(defaultRandomValueParam)
+ }
+ p.randomValueParam = int64(randomValueParamInt)
+
+ logrus.Infof("PriorityElderSchedulerAlgorithm initialized with
parameters: ageParam=%d, priorityParam=%d, resourceParam=%d,
randomValueParam=%d", p.ageParam, p.priorityParam, p.resourceParam,
p.randomValueParam)
+}
+
+func (p *PriorityElderSchedulerAlgorithm) FilterNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ // just return the waiting tasks as is for PriorityElder
+ return allTasks, nil
+}
+
+func (p *PriorityElderSchedulerAlgorithm) CalculateTaskEmergency(task
*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, printValue bool)
int64 {
+ // step 0: get params
+ ageParam := p.ageParam
+ priorityParam := p.priorityParam
+ resourceParam := p.resourceParam
+ randomValueParam := p.randomValueParam
+ // step 1: age
+ ageCost := ageParam * time.Since(task.CreateTime).Milliseconds() / 1000
// in seconds
+ // step 2: priority
+ priorityCost := priorityParam * int64(task.Priority)
+ // step 3: resource cost
+ graph := structure.GraphManager.GetGraphByName(task.SpaceName,
task.GraphName)
+ resourceCost := int64(0)
+ if graph == nil {
+ resourceCost = resourceParam // if graph not found, use max
resource cost
+ } else {
+ resourceCost = resourceParam / max(1,
graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1
+ }
+ // step 4: some random value
+ randomValue := int64(randomValueParam) // Placeholder for any random
value logic
+ if printValue {
+ logrus.Debugf("Task %d: Age Cost: %d, Priority Cost: %d,
Resource Cost: %d, Random Value: %d", task.ID, ageCost, priorityCost,
resourceCost, randomValue)
+ }
+ return ageCost + priorityCost + resourceCost + randomValue
+}
+
+func (p *PriorityElderSchedulerAlgorithm) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if len(allTasks) == 0 {
+ return nil, nil // No tasks to schedule
+ }
+
+ // calculate emergency value for each task
+ taskEmergencies := make(map[int32]int64)
+ for _, task := range allTasks {
+ taskEmergencies[task.ID] = p.CalculateTaskEmergency(task,
taskToWorkerGroupMap, false)
+ }
+
+ // Sort tasks by priority (higher priority first)
+ sort.Slice(allTasks, func(i, j int) bool {
+ return taskEmergencies[allTasks[i].ID] >
taskEmergencies[allTasks[j].ID]
+ })
+
+ for _, task := range allTasks {
+ logrus.Debugf("Task %d: Emergency Value: %d", task.ID,
taskEmergencies[task.ID])
+ }
+
+ for _, task := range allTasks {
+ if task.State != structure.TaskStateWaiting {
+ continue // Only consider tasks that are in the waiting
state
+ }
+ if group, exists := taskToWorkerGroupMap[task.ID]; exists &&
group != "" {
+ for _, idleGroup := range idleWorkerGroups {
+ if group == idleGroup {
+ logrus.Debugf("Task %d is assigned to
worker group %s", task.ID, group)
+ return []*structure.TaskInfo{task}, nil
// Return the first task that can be scheduled
+ }
+ }
+ // if allow concurrent running, check if the group is
in concurrent worker groups
+ if !task.Exclusive {
+ for _, concurrentGroup := range
concurrentWorkerGroups {
+ if group == concurrentGroup {
+ logrus.Debugf("Task %d is
assigned to concurrent worker group %s", task.ID, group)
+ return
[]*structure.TaskInfo{task}, nil // Return the first task that can be scheduled
+ }
+ }
+ }
+ }
+ }
+
+ return nil, nil
+}
+
+type WaitingSchedulerAlgorithm struct{}
+
+func (w *WaitingSchedulerAlgorithm) Name() string {
+ return "Waiting"
+}
+
+func (w *WaitingSchedulerAlgorithm) Init() {
+ // No specific initialization needed for Waiting
+ logrus.Info("Initializing WaitingSchedulerAlgorithm")
+}
+
+func (w *WaitingSchedulerAlgorithm) FilterNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ waitingTasks := make([]*structure.TaskInfo, 0)
+ for _, task := range allTasks {
+ if task.State == structure.TaskStateWaiting {
+ waitingTasks = append(waitingTasks, task)
+ }
+ }
+ return waitingTasks, nil
+}
+
+func (w *WaitingSchedulerAlgorithm) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ waitingTasks, err := w.FilterNextTasks(allTasks, taskToWorkerGroupMap,
idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+ if err != nil {
+ return nil, err
+ }
+ if len(waitingTasks) == 0 {
+ return nil, nil
+ }
+ for _, task := range waitingTasks {
+ if task.State != structure.TaskStateWaiting {
+ continue // Only consider tasks that are in the waiting
state
+ }
+ if group, exists := taskToWorkerGroupMap[task.ID]; exists &&
group != "" {
+ // only support idle worker groups for now
+ for _, idleGroup := range idleWorkerGroups {
+ if group == idleGroup {
+ logrus.Debugf("Task %d is assigned to
worker group %s", task.ID, group)
+ return []*structure.TaskInfo{task}, nil
// Return the first task that can be scheduled
+ }
+ }
+ }
+ }
+ return nil, nil // No tasks scheduled
+}
+
+type DependsSchedulerAlgorithm struct{}
+
+func (d *DependsSchedulerAlgorithm) Name() string {
+ return "Depends"
+}
+
+func (d *DependsSchedulerAlgorithm) Init() {
+ // No specific initialization needed for Depends
+ logrus.Info("Initializing DependsSchedulerAlgorithm")
+}
+
+func (d *DependsSchedulerAlgorithm) FilterNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if len(allTasks) == 0 {
+ return nil, nil // No tasks to schedule
+ }
+
+ sort.Slice(allTasks, func(i, j int) bool {
+ return allTasks[i].ID < allTasks[j].ID
+ })
+
+ taskIDs := make(map[int32]*structure.TaskInfo)
+ for _, task := range allTasks {
+ taskIDs[task.ID] = task
+ }
+
+ filteredTasks := make([]*structure.TaskInfo, 0)
+ for _, task := range allTasks {
+ depends := task.Preorders
+ // Check if all dependencies are satisfied
+ allDepsSatisfied := true
+ for _, dep := range depends {
+ if depTask, exists := taskIDs[dep]; exists &&
depTask.State != structure.TaskStateComplete {
+ allDepsSatisfied = false
+ break
+ }
+ }
+ if allDepsSatisfied {
+ if group, exists := taskToWorkerGroupMap[task.ID];
exists && group != "" {
+ filteredTasks = append(filteredTasks, task) //
Add to filtered tasks if dependencies are satisfied
+ }
+ }
+ }
+ return filteredTasks, nil
+}
+
+func (d *DependsSchedulerAlgorithm) ScheduleNextTasks(allTasks
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups
[]string, concurrentWorkerGroups []string, softSchedule bool)
([]*structure.TaskInfo, error) {
+ if len(allTasks) == 0 {
+ return nil, nil // No tasks to schedule
+ }
+
+ sort.Slice(allTasks, func(i, j int) bool {
+ return allTasks[i].ID < allTasks[j].ID
+ })
+
+ allTaskIDs := make(map[int32]*structure.TaskInfo)
+ for _, task := range allTasks {
+ allTaskIDs[task.ID] = task
+ }
+
+ for _, task := range allTasks {
+ depends := task.Preorders
+ // Check if all dependencies are satisfied
+ allDepsSatisfied := true
+ for _, dep := range depends {
+ if depTask, exists := allTaskIDs[dep]; exists &&
depTask.State != structure.TaskStateComplete {
+ allDepsSatisfied = false
+ break
+ }
+ }
+ if allDepsSatisfied {
+ if group, exists := taskToWorkerGroupMap[task.ID];
exists && group != "" {
+ // only support idle worker groups for now
+ if slices.Contains(idleWorkerGroups, group) {
+ logrus.Debugf("Task %d is assigned to
worker group %s", task.ID, group)
+ return []*structure.TaskInfo{task}, nil
// Return the first task that can be scheduled
+ }
+ }
+ }
+ }
+
+ return nil, nil
+}
diff --git a/vermeer/apps/master/schedules/scheduler_cron_manager.go
b/vermeer/apps/master/schedules/scheduler_cron_manager.go
new file mode 100644
index 00000000..651e1b9e
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_cron_manager.go
@@ -0,0 +1,161 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+ "errors"
+ "vermeer/apps/structure"
+
+ "github.com/robfig/cron/v3"
+ "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerCronManager is the manager for the scheduler cron.
+* @Note: This is the manager for the scheduler cron.
+ */
+type SchedulerCronManager struct {
+ cronTasks map[int32][]*structure.TaskInfo // cron expression to
TaskInfo. Origin task ID to copied tasks
+ crons map[int32][]*cron.Cron // cron expression to cron
jobs
+ // queueTemplateHandler is a function that handles the task queue
+ queueTemplateHandler func(*structure.TaskInfo) (int32, error)
+}
+
+/*
+* @Description: Init initializes the SchedulerCronManager.
+* @Note: This function will initialize the SchedulerCronManager.
+* @Param queueTemplateHandler
+* @Return *SchedulerCronManager
+ */
+func (t *SchedulerCronManager) Init(queueTemplateHandler
func(*structure.TaskInfo) (int32, error)) *SchedulerCronManager {
+ t.cronTasks = make(map[int32][]*structure.TaskInfo)
+ t.crons = make(map[int32][]*cron.Cron)
+ t.queueTemplateHandler = queueTemplateHandler
+ return t
+}
+
+/*
+* @Description: CheckCronExpression checks the cron expression.
+* @Note: This function will check the cron expression.
+* @Param cronExpr
+* @Return error
+ */
+func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error {
+ if cronExpr == "" {
+ return errors.New("cron expression is empty")
+ }
+ if _, err := cron.ParseStandard(cronExpr); err != nil {
+ logrus.Errorf("Failed to parse cron expression: %v", err)
+ return errors.New("invalid cron expression: " + err.Error())
+ }
+ return nil
+}
+
+/*
+* @Description: AddCronTask adds the cron task.
+* @Note: This function will add the cron task.
+* @Param taskInfo
+* @Return error
+ */
+func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error
{
+ if taskInfo == nil {
+ return errors.New("the argument `taskInfo` is nil")
+ }
+
+ if taskInfo.CronExpr == "" {
+ return errors.New("the property `CronExpr` of taskInfo is
empty")
+ }
+
+ // add to cron tasks
+ cronJob := cron.New()
+ _, err := cronJob.AddFunc(taskInfo.CronExpr, func() {
+ if taskInfo == nil {
+ return
+ }
+
+ // CREATE a new task from the original task, using taskbl, it
is handled in queueTemplateHandler
+ // copy a new taskInfo
+ newID, err := t.queueTemplateHandler(taskInfo)
+ if err != nil {
+ logrus.Errorf("Failed to queue task %d in cron job:
%v", taskInfo.ID, err)
+ return
+ }
+ logrus.Infof("Successfully queued task %d from cron job", newID)
+ })
+ if err != nil {
+ logrus.Errorf("Failed to add cron job for task %d: %v",
taskInfo.ID, err)
+ return err
+ }
+ t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo)
+ t.crons[taskInfo.ID] = append(t.crons[taskInfo.ID], cronJob)
+ cronJob.Start()
+ logrus.Infof("Added cron task for task ID %d with expression %s",
taskInfo.ID, taskInfo.CronExpr)
+ return nil
+}
+
+/*
+* @Description: DeleteTask deletes the cron task.
+* @Note: This function will delete the cron task.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerCronManager) DeleteTask(taskID int32) error {
+ if _, exists := t.cronTasks[taskID]; !exists {
+ return errors.New("task not found in cron tasks")
+ }
+
+ for _, cronJob := range t.crons[taskID] {
+ cronJob.Stop()
+ }
+ delete(t.cronTasks, taskID)
+ delete(t.crons, taskID)
+ logrus.Infof("Deleted cron task for task ID %d", taskID)
+ return nil
+}
+
+/*
+* @Description: DeleteTaskByGraph deletes the cron task by graph.
+* @Note: This function will delete the cron task by graph.
+* @Param spaceName
+* @Param graphName
+* @Return error
+ */
+func (t *SchedulerCronManager) DeleteTaskByGraph(spaceName, graphName string)
error {
+ if spaceName == "" || graphName == "" {
+ return errors.New("the argument `spaceName` or `graphName` is
empty")
+ }
+
+ var toDelete []int32
+ for taskID, tasks := range t.cronTasks {
+ for _, task := range tasks {
+ if task.SpaceName == spaceName && task.GraphName ==
graphName {
+ toDelete = append(toDelete, taskID)
+ break
+ }
+ }
+ }
+
+ for _, taskID := range toDelete {
+ if err := t.DeleteTask(taskID); err != nil {
+ logrus.Errorf("Failed to delete cron task for task ID
%d: %v", taskID, err)
+ return err
+ }
+ }
+ logrus.Infof("Deleted cron tasks for space %s and graph %s", spaceName,
graphName)
+ return nil
+}
diff --git a/vermeer/apps/master/schedules/scheduler_resource_manager.go
b/vermeer/apps/master/schedules/scheduler_resource_manager.go
new file mode 100644
index 00000000..8dabd3d0
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_resource_manager.go
@@ -0,0 +1,258 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+ "errors"
+ "vermeer/apps/structure"
+
+ "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: WorkerOngoingStatus is the status of the worker ongoing.
+* @Note: This is the status of the worker ongoing.
+ */
+type WorkerOngoingStatus string
+
+const (
+ WorkerOngoingStatusIdle WorkerOngoingStatus = "idle"
+ WorkerOngoingStatusRunning WorkerOngoingStatus = "running"
+ WorkerOngoingStatusConcurrentRunning WorkerOngoingStatus =
"concurrent_running"
+ WorkerOngoingStatusPaused WorkerOngoingStatus = "paused"
+ WorkerOngoingStatusDeleted WorkerOngoingStatus = "deleted"
+)
+
+/*
+* @Description: SchedulerResourceManager is the manager for the scheduler
resource.
+* @Note: This is the manager for the scheduler resource.
+ */
+type SchedulerResourceManager struct {
+ structure.MutexLocker
+ workerStatus map[string]WorkerOngoingStatus
+ workerGroupStatus map[string]WorkerOngoingStatus
+ runningWorkerGroupTasks map[string][]int32 // worker group name to list
of running task IDs
+ // broker just responsible for communication with workers
+ // it can not apply tasks to workers directly
+ broker *Broker
+}
+
+/*
+* @Description: Init initializes the SchedulerResourceManager.
+* @Note: This function will initialize the SchedulerResourceManager.
+ */
+func (rm *SchedulerResourceManager) Init() {
+ rm.workerStatus = make(map[string]WorkerOngoingStatus)
+ rm.workerGroupStatus = make(map[string]WorkerOngoingStatus)
+ rm.runningWorkerGroupTasks = make(map[string][]int32)
+ rm.broker = new(Broker).Init()
+}
+
+/*
+* @Description: ReleaseByTaskID releases the resource by task ID.
+* @Note: This function will release the resource by task ID.
+* @Param taskID
+ */
+func (rm *SchedulerResourceManager) ReleaseByTaskID(taskID int32) {
+ defer rm.Unlock(rm.Lock())
+
+ for workerGroup, status := range rm.workerGroupStatus {
+ if (status == WorkerOngoingStatusRunning || status ==
WorkerOngoingStatusConcurrentRunning) &&
rm.isTaskRunningOnWorkerGroup(workerGroup, taskID) {
+ delete(rm.workerGroupStatus, workerGroup)
+ if tasks, exists :=
rm.runningWorkerGroupTasks[workerGroup]; exists {
+ for i, id := range tasks {
+ if id == taskID {
+
rm.runningWorkerGroupTasks[workerGroup] = append(tasks[:i], tasks[i+1:]...)
+ if
len(rm.runningWorkerGroupTasks[workerGroup]) == 0 {
+
delete(rm.runningWorkerGroupTasks, workerGroup)
+ }
+ break
+ }
+ }
+ }
+ if tasks, exists :=
rm.runningWorkerGroupTasks[workerGroup]; !exists || len(tasks) == 0 {
+ for _, worker := range
workerMgr.GetGroupWorkers(workerGroup) {
+ rm.changeWorkerStatus(worker.Name,
WorkerOngoingStatusIdle)
+ }
+ } else {
+ for _, worker := range
workerMgr.GetGroupWorkers(workerGroup) {
+ rm.changeWorkerStatus(worker.Name,
WorkerOngoingStatusConcurrentRunning)
+ }
+ }
+ }
+ }
+}
+
+/*
+* @Description: isTaskRunningOnWorkerGroup checks if the task is running on
the worker group.
+* @Note: This function will check if the task is running on the worker group.
+* @Param workerGroup
+* @Param taskID
+* @Return bool
+ */
+func (rm *SchedulerResourceManager) isTaskRunningOnWorkerGroup(workerGroup
string, taskID int32) bool {
+ if tasks, exists := rm.runningWorkerGroupTasks[workerGroup]; exists {
+ for _, id := range tasks {
+ if id == taskID {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+/*
+* @Description: GetAgentAndAssignTask gets the agent and assigns the task.
+* @Note: This function will get the agent and assigns the task.
+* @Param taskInfo
+* @Return *Agent, AgentStatus, error
+ */
+func (rm *SchedulerResourceManager) GetAgentAndAssignTask(taskInfo
*structure.TaskInfo) (*Agent, AgentStatus, error) {
+ if taskInfo == nil {
+ return nil, AgentStatusError, errors.New("taskInfo is nil")
+ }
+
+ defer rm.Unlock(rm.Lock())
+
+ agent, status, workers, err := rm.broker.ApplyAgent(taskInfo,
!taskInfo.Exclusive)
+ if err != nil {
+ return nil, AgentStatusError, err
+ }
+ if agent == nil {
+ return nil, status, nil
+ }
+
+ // Assign the task to the agent
+ agent.AssignTask(taskInfo)
+
+ exclusive := taskInfo.Exclusive
+ runningStatus := WorkerOngoingStatusRunning
+ if _, exists := rm.runningWorkerGroupTasks[agent.GroupName()]; !exists
&& exclusive {
+ rm.runningWorkerGroupTasks[agent.GroupName()] = []int32{}
+ runningStatus = WorkerOngoingStatusRunning
+ rm.workerGroupStatus[agent.GroupName()] = runningStatus
+ } else {
+ runningStatus = WorkerOngoingStatusConcurrentRunning
+ rm.workerGroupStatus[agent.GroupName()] = runningStatus
+ }
+ rm.runningWorkerGroupTasks[agent.GroupName()] =
append(rm.runningWorkerGroupTasks[agent.GroupName()], taskInfo.ID)
+
+ for _, worker := range workers {
+ if worker == nil {
+ continue
+ }
+ rm.workerStatus[worker.Name] = runningStatus
+ }
+
+ return agent, status, nil
+}
+
+/*
+* @Description: GetIdleWorkerGroups gets the idle worker groups.
+* @Note: This function will get the idle worker groups.
+* @Return []string
+ */
+func (rm *SchedulerResourceManager) GetIdleWorkerGroups() []string {
+ defer rm.Unlock(rm.Lock())
+
+ idleWorkerGroups := make([]string, 0)
+ for workerGroup, status := range rm.workerGroupStatus {
+ if status == WorkerOngoingStatusIdle {
+ idleWorkerGroups = append(idleWorkerGroups, workerGroup)
+ }
+ }
+ return idleWorkerGroups
+}
+
+/*
+* @Description: GetConcurrentWorkerGroups gets the concurrent worker groups.
+* @Note: This function will get the concurrent worker groups.
+* @Return []string
+ */
+func (rm *SchedulerResourceManager) GetConcurrentWorkerGroups() []string {
+ defer rm.Unlock(rm.Lock())
+
+ concurrentWorkerGroups := make([]string, 0)
+ for workerGroup, status := range rm.workerGroupStatus {
+ if status == WorkerOngoingStatusConcurrentRunning {
+ concurrentWorkerGroups = append(concurrentWorkerGroups,
workerGroup)
+ }
+ }
+ return concurrentWorkerGroups
+}
+
+/*
+* @Description: changeWorkerStatus changes the worker status.
+* @Note: This function will change the worker status.
+* @Param workerName
+* @Param status
+ */
+func (rm *SchedulerResourceManager) changeWorkerStatus(workerName string,
status WorkerOngoingStatus) {
+ rm.workerStatus[workerName] = status
+
+ if status == WorkerOngoingStatusIdle || status ==
WorkerOngoingStatusConcurrentRunning {
+ workerInfo := workerMgr.GetWorkerInfo(workerName)
+
+ if workerInfo == nil {
+ logrus.Warnf("worker '%s' not found", workerName)
+ return
+ }
+
+ // get worker group name
+ groupName := workerInfo.Group
+ if groupName != "" {
+ gws := workerMgr.GetGroupWorkers(groupName)
+ allIdle := true
+ allConcurrent := true
+ for _, w := range gws {
+ st := rm.workerStatus[w.Name]
+ if st != WorkerOngoingStatusIdle {
+ allIdle = false
+ }
+ if st != WorkerOngoingStatusConcurrentRunning {
+ allConcurrent = false
+ }
+ }
+ if allConcurrent || allIdle {
+ newStatus := WorkerOngoingStatusIdle
+ if allConcurrent {
+ newStatus =
WorkerOngoingStatusConcurrentRunning
+ }
+ logrus.Debugf("Change worker group '%s' status
to '%s' (derived from %d workers)", groupName, newStatus, len(gws))
+ rm.changeWorkerGroupStatus(groupName, newStatus)
+ }
+ }
+
+ } else if status == WorkerOngoingStatusDeleted {
+ delete(rm.workerStatus, workerName)
+ }
+
+ // TODO: Other status changes can be handled here if needed
+}
+
+func (rm *SchedulerResourceManager) changeWorkerGroupStatus(workerGroup
string, status WorkerOngoingStatus) {
+ logrus.Infof("Change worker group '%s' status to '%s'", workerGroup,
status)
+ rm.workerGroupStatus[workerGroup] = status
+}
+
+// TODO: when sync task created, need to alloc worker?
+func (rm *SchedulerResourceManager) ChangeWorkerStatus(workerName string,
status WorkerOngoingStatus) {
+ defer rm.Unlock(rm.Lock())
+
+ rm.changeWorkerStatus(workerName, status)
+}
diff --git a/vermeer/apps/master/schedules/scheduler_task_manager.go
b/vermeer/apps/master/schedules/scheduler_task_manager.go
new file mode 100644
index 00000000..21767a39
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_task_manager.go
@@ -0,0 +1,291 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+ "errors"
+ "vermeer/apps/common"
+ "vermeer/apps/structure"
+
+ "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerTaskManager is the manager for the scheduler task.
+* @Note: This is the manager for the scheduler task.
+ */
+type SchedulerTaskManager struct {
+ structure.MutexLocker
+ // This struct is responsible for managing tasks in the scheduling
system.
+ // A map from task ID to TaskInfo can be used to track tasks.
+ allTaskMap map[int32]*structure.TaskInfo
+ allTaskQueue []*structure.TaskInfo
+ // For debug or test, get task start sequence
+ startTaskQueue []*structure.TaskInfo
+ // onGoingTasks
+ notCompleteTasks map[int32]*structure.TaskInfo
+ // A map from task ID to worker group can be used to track which worker
group is handling which task.
+ taskToworkerGroupMap map[int32]string
+}
+
+/*
+* @Description: Init initializes the SchedulerTaskManager.
+* @Note: This function will initialize the SchedulerTaskManager.
+* @Return *SchedulerTaskManager
+ */
+func (t *SchedulerTaskManager) Init() *SchedulerTaskManager {
+ t.allTaskMap = make(map[int32]*structure.TaskInfo)
+ t.notCompleteTasks = make(map[int32]*structure.TaskInfo)
+ t.taskToworkerGroupMap = make(map[int32]string)
+ return t
+}
+
+/*
+* @Description: QueueTask queues the task.
+* @Note: This function will queue the task.
+* @Param taskInfo
+* @Return bool, error
+ */
+func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool,
error) {
+ if taskInfo == nil {
+ return false, errors.New("the argument `taskInfo` is nil")
+ }
+
+ if taskInfo.SpaceName == "" {
+ return false, errors.New("the property `SpaceName` of taskInfo
is empty")
+ }
+
+ defer t.Unlock(t.Lock())
+
+ // Add the task to the task map
+ t.allTaskMap[taskInfo.ID] = taskInfo
+ t.allTaskQueue = append(t.allTaskQueue, taskInfo)
+ t.notCompleteTasks[taskInfo.ID] = taskInfo
+ t.AssignGroup(taskInfo)
+ return true, nil
+}
+
+/*
+* @Description: RefreshTaskToWorkerGroupMap refreshes the task to worker group
map.
+* @Note: This function will refresh the task to worker group map.
+ */
+func (t *SchedulerTaskManager) RefreshTaskToWorkerGroupMap() {
+ defer t.Unlock(t.Lock())
+
+ for _, taskInfo := range t.GetAllTasksNotComplete() {
+ if taskInfo == nil {
+ continue
+ }
+ t.AssignGroup(taskInfo)
+ t.taskToworkerGroupMap[taskInfo.ID] =
workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+ }
+}
+
+// Only for debug or test, get task start sequence
+/*
+* @Description: AddTaskStartSequence adds the task start sequence.
+* @Note: This function will add the task start sequence.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) AddTaskStartSequence(taskID int32) error {
+ if common.GetConfig("debug_mode").(string) != "debug" {
+ logrus.Warn("TaskStartSequence called but debug features are
disabled")
+ return nil
+ }
+ if _, exists := t.allTaskMap[taskID]; !exists {
+ return errors.New("task not found")
+ }
+ t.startTaskQueue = append(t.startTaskQueue, t.allTaskMap[taskID])
+ return nil
+}
+
+/*
+* @Description: RemoveTask removes the task.
+* @Note: This function will remove the task.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) RemoveTask(taskID int32) error {
+ if _, exists := t.allTaskMap[taskID]; !exists {
+ return errors.New("task not found")
+ }
+ defer t.Unlock(t.Lock())
+ delete(t.allTaskMap, taskID)
+ // remove from queue
+ for i, task := range t.allTaskQueue {
+ if task.ID == taskID {
+ t.allTaskQueue = append(t.allTaskQueue[:i],
t.allTaskQueue[i+1:]...)
+ break
+ }
+ }
+ delete(t.taskToworkerGroupMap, taskID)
+ delete(t.notCompleteTasks, taskID)
+ return nil
+}
+
+/*
+* @Description: MarkTaskComplete marks the task complete.
+* @Note: This function will mark the task complete.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) MarkTaskComplete(taskID int32) error {
+ if _, exists := t.allTaskMap[taskID]; !exists {
+ return errors.New("task not found")
+ }
+ defer t.Unlock(t.Lock())
+ delete(t.notCompleteTasks, taskID)
+ return nil
+}
+
+// update or create a task in the task map
+/*
+* @Description: AssignGroup assigns the group.
+* @Note: This function will assign the group.
+* @Param taskInfo
+* @Return error
+ */
+func (t *SchedulerTaskManager) AssignGroup(taskInfo *structure.TaskInfo) error
{
+ group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+ if group == "" {
+ return errors.New("failed to assign group for task")
+ }
+ t.taskToworkerGroupMap[taskInfo.ID] = group
+ return nil
+}
+
+/*
+* @Description: GetTaskByID gets the task by ID.
+* @Note: This function will get the task by ID.
+* @Param taskID
+* @Return *structure.TaskInfo, error
+ */
+func (t *SchedulerTaskManager) GetTaskByID(taskID int32) (*structure.TaskInfo,
error) {
+ task, exists := t.allTaskMap[taskID]
+ if !exists {
+ return nil, errors.New("task not found")
+ }
+ return task, nil
+}
+
+/*
+* @Description: GetLastTask gets the last task.
+* @Note: This function will get the last task.
+* @Param spaceName
+* @Return *structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetLastTask(spaceName string)
*structure.TaskInfo {
+ // Implement logic to get the last task in the queue for the given space
+ if len(t.allTaskQueue) == 0 {
+ return nil
+ }
+ for i := len(t.allTaskQueue) - 1; i >= 0; i-- {
+ if t.allTaskQueue[i].SpaceName == spaceName {
+ return t.allTaskQueue[i]
+ }
+ }
+ return nil
+}
+
+/*
+* @Description: GetAllTasks gets all tasks.
+* @Note: This function will get all tasks.
+* @Return []*structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetAllTasks() []*structure.TaskInfo {
+ tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+ for _, task := range t.allTaskMap {
+ tasks = append(tasks, task)
+ }
+ return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksNotComplete() []*structure.TaskInfo {
+ tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+ for _, task := range t.notCompleteTasks {
+ tasks = append(tasks, task)
+ }
+ return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksWaiting() []*structure.TaskInfo {
+ tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+ for _, task := range t.GetAllTasksNotComplete() {
+ if task.State == structure.TaskStateWaiting {
+ tasks = append(tasks, task)
+ }
+ }
+ return tasks
+}
+
+func (t *SchedulerTaskManager) GetTasksInQueue(space string)
[]*structure.TaskInfo {
+ tasks := make([]*structure.TaskInfo, 0)
+ for _, task := range t.GetAllTasksNotComplete() {
+ if task.SpaceName == space {
+ tasks = append(tasks, task)
+ }
+ }
+ return tasks
+}
+
+// Only for debug or test, get task start sequence
+func (t *SchedulerTaskManager) GetTaskStartSequence(queryTasks []int32)
[]*structure.TaskInfo {
+ if len(t.startTaskQueue) == 0 {
+ return nil
+ }
+ if len(queryTasks) == 0 {
+ return t.startTaskQueue
+ }
+ tasks := make([]*structure.TaskInfo, 0, len(queryTasks))
+ taskSet := make(map[int32]struct{})
+ for _, id := range queryTasks {
+ taskSet[id] = struct{}{}
+ }
+ for _, task := range t.startTaskQueue {
+ if _, exists := taskSet[task.ID]; exists {
+ tasks = append(tasks, task)
+ }
+ }
+ logrus.Infof("GetTaskStartSequence: return %d tasks", len(tasks))
+ for _, task := range tasks {
+ logrus.Debugf("TaskID: %d", task.ID)
+ }
+ return tasks
+}
+
+func (t *SchedulerTaskManager) GetTaskToWorkerGroupMap() map[int32]string {
+ // Return a copy of the worker group map to avoid external modifications
+ taskNotComplete := t.GetAllTasksNotComplete()
+ groupMap := make(map[int32]string, len(taskNotComplete))
+ for _, task := range taskNotComplete {
+ if group, exists := t.taskToworkerGroupMap[task.ID]; exists {
+ groupMap[task.ID] = group
+ }
+ }
+ return groupMap
+}
+
+func (t *SchedulerTaskManager) IsTaskOngoing(taskID int32) bool {
+ // Check if the task is currently ongoing
+ task, exists := t.allTaskMap[taskID]
+ if !exists {
+ return false
+ }
+ return task.State == structure.TaskStateCreated
+}
diff --git a/vermeer/apps/master/services/http_tasks.go
b/vermeer/apps/master/services/http_tasks.go
index 03130ca8..de4c5eae 100644
--- a/vermeer/apps/master/services/http_tasks.go
+++ b/vermeer/apps/master/services/http_tasks.go
@@ -112,7 +112,7 @@ func handleTaskCreation(ctx *gin.Context, exeFunc
func(*structure.TaskInfo) erro
}
filteredTask := taskBiz(ctx).FilteringTask(task)
- ctx.JSON(http.StatusOK, TaskResp{Task: taskInfo2TaskJson(filteredTask)})
+ ctx.JSON(http.StatusOK, TaskCreateResponse{Task:
taskInfo2TaskJson(filteredTask)})
}
type TaskCreateBatchHandler struct {
@@ -231,3 +231,33 @@ func (ch *ComputeValueHandler) GET(ctx *gin.Context) {
ctx.JSON(http.StatusOK, resp)
}
+
+type TaskStartSequenceHandler struct {
+ common.SenHandler
+}
+
+type TaskStartSequenceRequest struct {
+ QueryTasks []int32 `json:"query_tasks,omitempty"`
+}
+
+type TaskStartSequenceResp struct {
+ common.BaseResp
+ Sequence []int32 `json:"sequence,omitempty"`
+}
+
+func (tsh *TaskStartSequenceHandler) POST(ctx *gin.Context) {
+ req := TaskStartSequenceRequest{}
+ err := ctx.BindJSON(&req)
+ if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request
body not correct: %s", err) }) {
+ return
+ }
+
+ r := Scheduler.TaskStartSequence(req.QueryTasks)
+
+ sequence := make([]int32, 0, 1)
+ for _, task := range r {
+ sequence = append(sequence, int32(task.ID))
+ }
+
+ ctx.JSON(http.StatusOK, TaskStartSequenceResp{Sequence: sequence,
BaseResp: common.BaseResp{ErrCode: 0, Message: "ok"}})
+}
diff --git a/vermeer/apps/master/services/router.go
b/vermeer/apps/master/services/router.go
index e1000d25..ac01e834 100644
--- a/vermeer/apps/master/services/router.go
+++ b/vermeer/apps/master/services/router.go
@@ -35,12 +35,13 @@ func SetRouters(sen *common.Sentinel, authFilters
...gin.HandlerFunc) {
// /tasks
regVerAPI(sen, 1, "/tasks", map[string]common.BaseHandler{
- "": &TasksHandler{},
- "/create": &TaskCreateHandler{},
- // "/create/batch": &TaskCreateBatchHandler{},
+ "": &TasksHandler{},
+ "/create": &TaskCreateHandler{},
+ "/create/batch": &TaskCreateBatchHandler{},
"/create/sync": &TaskCreateSyncHandler{},
"/oltp": &OltpHandler{},
"/value/:task_id": &ComputeValueHandler{},
+ "/start_sequence": &TaskStartSequenceHandler{},
}, authFilters...)
// /task
diff --git a/vermeer/apps/master/workers/worker_manager.go
b/vermeer/apps/master/workers/worker_manager.go
index bfaec0b8..f2ea2fb6 100644
--- a/vermeer/apps/master/workers/worker_manager.go
+++ b/vermeer/apps/master/workers/worker_manager.go
@@ -577,6 +577,10 @@ func (wm *workerManager) getGroupWorkers(workerGroup
string) []*WorkerClient {
return workers
}
+func (wm *workerManager) GetGroupWorkers(workerGroup string) []*WorkerClient {
+ return wm.getGroupWorkers(workerGroup)
+}
+
func (wm *workerManager) getGroupWorkerMap(workerGroup string)
map[string]*WorkerClient {
workerMap := make(map[string]*WorkerClient)
diff --git a/vermeer/apps/structure/task.go b/vermeer/apps/structure/task.go
index 87356f2b..eb000069 100644
--- a/vermeer/apps/structure/task.go
+++ b/vermeer/apps/structure/task.go
@@ -55,6 +55,12 @@ type TaskInfo struct {
wg *sync.WaitGroup
Action int32
StatisticsResult map[string]any
+
+ // for scheduler
+ Priority int32
+ Preorders []int32
+ Exclusive bool // whether the task can be executed concurrently with
other tasks
+ CronExpr string // cron expression for scheduling
}
func (ti *TaskInfo) SetState(state TaskState) {
diff --git a/vermeer/client/client.go b/vermeer/client/client.go
index d9d1a17f..34553aa3 100644
--- a/vermeer/client/client.go
+++ b/vermeer/client/client.go
@@ -150,6 +150,26 @@ func (vc *VermeerClient) GetWorkers() (*WorkersResponse,
error) {
return workersResp, err
}
+func (vc *VermeerClient) AllocGroupGraph(graphName string, groupName string)
(bool, error) {
+ reader, err := Request2Reader(struct{}{})
+ if err != nil {
+ return false, err
+ }
+ resp, err :=
vc.post(vc.httpAddr+"/admin/workers/alloc/"+groupName+"/$DEFAULT/"+graphName,
reader)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ respByte, err := ParseResponse2Byte(resp)
+ if err != nil {
+ return false, err
+ }
+ return false, fmt.Errorf("response:%s", string(respByte))
+ }
+ return true, nil
+}
+
func (vc *VermeerClient) GetMaster() (*MasterResponse, error) {
resp, err := vc.get(vc.httpAddr + "/master")
if err != nil {
@@ -228,6 +248,31 @@ func (vc *VermeerClient) CreateTaskSync(request
TaskCreateRequest) (*TaskRespons
return taskResponse, err
}
+func (vc *VermeerClient) CreateTaskBatch(request TaskCreateBatchRequest)
(*TaskBatchCreateResponse, error) {
+ reader, err := Request2Reader(request)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := vc.post(vc.httpAddr+"/tasks/create/batch", reader)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ respByte, err := ParseResponse2Byte(resp)
+ if err != nil {
+ return nil, err
+ }
+ return nil, fmt.Errorf("response:%s", string(respByte))
+ }
+ taskResp := &TaskBatchCreateResponse{}
+ err = ParseResponse2Any(resp, taskResp)
+ if err != nil {
+ return nil, err
+ }
+ return taskResp, err
+}
+
func (vc *VermeerClient) GetTasks() (*TasksResponse, error) {
resp, err := vc.get(vc.httpAddr + "/tasks")
if err != nil {
@@ -270,6 +315,31 @@ func (vc *VermeerClient) GetTask(taskID int)
(*TaskResponse, error) {
return taskResp, err
}
+func (vc *VermeerClient) GetTaskStartSequence(queryTasks []int32)
(*TaskStartSequenceResp, error) {
+ reader, err := Request2Reader(TaskStartSequenceRequest{QueryTasks:
queryTasks})
+ if err != nil {
+ return nil, err
+ }
+ resp, err := vc.post(vc.httpAddr+"/tasks/start_sequence", reader)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ respByte, err := ParseResponse2Byte(resp)
+ if err != nil {
+ return nil, err
+ }
+ return nil, fmt.Errorf("response:%s", string(respByte))
+ }
+ taskResp := &TaskStartSequenceResp{}
+ err = ParseResponse2Any(resp, taskResp)
+ if err != nil {
+ return nil, err
+ }
+ return taskResp, err
+}
+
func (vc *VermeerClient) GetEdges(graphName string, vertexID string, direction
string) (*EdgesResponse, error) {
resp, err := vc.get(vc.httpAddr + "/graphs/" + graphName +
"/edges?vertex_id=" + vertexID + "&direction=" + direction)
if err != nil {
diff --git a/vermeer/client/structure.go b/vermeer/client/structure.go
index ceeead39..ee233b42 100644
--- a/vermeer/client/structure.go
+++ b/vermeer/client/structure.go
@@ -38,6 +38,17 @@ type TaskResponse struct {
Task TaskInfo `json:"task,omitempty"`
}
+type TaskStartSequenceRequest struct {
+ QueryTasks []int32 `json:"query_tasks,omitempty"`
+}
+
+type TaskStartSequenceResp struct {
+ BaseResponse
+ Sequence []int32 `json:"sequence,omitempty"`
+}
+
+type TaskBatchCreateResponse []TaskResponse
+
type TaskInfo struct {
ID int32 `json:"id,omitempty"`
Status string `json:"status,omitempty"`
@@ -161,6 +172,8 @@ type TaskCreateRequest struct {
Params map[string]string `json:"params"`
}
+type TaskCreateBatchRequest []TaskCreateRequest
+
type GraphCreateRequest struct {
Name string `json:"name,omitempty"`
}
diff --git a/vermeer/config/master.ini b/vermeer/config/master.ini
index 8a7adb13..34f1859c 100644
--- a/vermeer/config/master.ini
+++ b/vermeer/config/master.ini
@@ -25,3 +25,4 @@ task_parallel_num=1
auth=none
auth_token_factor=1234
start_chan_size=10
+ticker_interval=1
diff --git a/vermeer/config/master.ini b/vermeer/config/worker04.ini
similarity index 85%
copy from vermeer/config/master.ini
copy to vermeer/config/worker04.ini
index 8a7adb13..8b341c8f 100644
--- a/vermeer/config/master.ini
+++ b/vermeer/config/worker04.ini
@@ -16,12 +16,8 @@
[default]
log_level=info
debug_mode=release
-http_peer=0.0.0.0:6688
-grpc_peer=0.0.0.0:6689
+http_peer=0.0.0.0:6988
+grpc_peer=0.0.0.0:6989
master_peer=127.0.0.1:6689
-run_mode=master
-task_strategy=1
-task_parallel_num=1
-auth=none
-auth_token_factor=1234
-start_chan_size=10
+run_mode=worker
+worker_group=test
\ No newline at end of file
diff --git a/vermeer/docker-compose.yaml b/vermeer/docker-compose.yaml
new file mode 100644
index 00000000..2cf90f6a
--- /dev/null
+++ b/vermeer/docker-compose.yaml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3.8'
+
+services:
+ vermeer-master:
+ image: hugegraph/vermeer
+ container_name: vermeer-master
+ volumes:
+ - ~/:/go/bin/config # Change here to your actual config path
+ command: --env=master
+ networks:
+ vermeer_network:
+ ipv4_address: 172.20.0.10 # Assign a static IP for the master
+
+ vermeer-worker:
+ image: hugegraph/vermeer
+ container_name: vermeer-worker
+ volumes:
+ - ~/:/go/bin/config # Change here to your actual config path
+ command: --env=worker
+ networks:
+ vermeer_network:
+ ipv4_address: 172.20.0.11 # Assign a static IP for the worker
+
+networks:
+ vermeer_network:
+ driver: bridge
+ ipam:
+ config:
+ - subnet: 172.20.0.0/24 # Define the subnet for your network
\ No newline at end of file
diff --git a/vermeer/go.mod b/vermeer/go.mod
index 0ba852b4..7d854284 100644
--- a/vermeer/go.mod
+++ b/vermeer/go.mod
@@ -72,6 +72,7 @@ require (
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a
// indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
+ github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c //
indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
diff --git a/vermeer/go.sum b/vermeer/go.sum
index c30b27ba..74351602 100644
--- a/vermeer/go.sum
+++ b/vermeer/go.sum
@@ -395,6 +395,8 @@ github.com/prometheus/procfs v0.1.3/go.mod
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3
h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/robfig/cron/v3 v3.0.0
h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
+github.com/robfig/cron/v3 v3.0.0/go.mod
h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.3.0/go.mod
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod
h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod
h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
diff --git a/vermeer/test/functional/compute_base.go
b/vermeer/test/functional/compute_base.go
index 256c7294..745fd7a5 100644
--- a/vermeer/test/functional/compute_base.go
+++ b/vermeer/test/functional/compute_base.go
@@ -97,6 +97,103 @@ func (ctb *ComputeTaskBase) SendComputeReqAsync(params
map[string]string) {
require.Equal(ctb.t, "complete", taskResp.Task.Status)
}
+/*
+* @Description: SendComputeReqAsyncNotWait sends a compute request
asynchronously and returns the task ID.
+* @Param params
+* @Return int32
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWait(params
map[string]string) int32 {
+ //create Compute Task
+ resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{
+ TaskType: "compute",
+ GraphName: ctb.graphName,
+ Params: params,
+ })
+ require.NoError(ctb.t, err)
+ return int32(resp.Task.ID)
+}
+
+/*
+* @Description: SendComputeReqAsyncNotWaitWithError sends a compute request
asynchronously and returns the task ID and error.
+* @Param params
+* @Return int32, error
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWaitWithError(params
map[string]string) (int32, error) {
+ //create Compute Task
+ resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{
+ TaskType: "compute",
+ GraphName: ctb.graphName,
+ Params: params,
+ })
+ if err != nil {
+ return -1, err
+ }
+ return int32(resp.Task.ID), nil
+}
+
+/*
+* @Description: SendComputeReqAsyncBatchPriority sends a compute request
asynchronously and returns the task ID and sequence.
+* @Note: This function will block the main thread until all tasks are
completed.
+* @Param params
+* @Return []int32, []int32
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncBatchPriority(params
[]map[string]string) ([]int32, []int32) {
+ //create Compute Task
+ tasks := make([]client.TaskInfo, 0, len(params))
+ taskIds := make([]int32, 0, len(params))
+ createTasksParams := client.TaskCreateBatchRequest{}
+ for i := 0; i < len(params); i++ {
+ graph := ctb.graphName
+ if params[i]["graph_name"] != "" {
+ graph = params[i]["graph_name"]
+ }
+ createTasksParams = append(createTasksParams,
client.TaskCreateRequest{
+ TaskType: "compute",
+ GraphName: graph,
+ Params: params[i],
+ })
+ }
+ resp, err := ctb.masterHttp.CreateTaskBatch(createTasksParams)
+ require.NoError(ctb.t, err)
+
+ for i, r := range *resp {
+ if r.BaseResponse.ErrCode != 0 {
+ ctb.t.Fatalf("create compute task %d failed: %s", i,
r.BaseResponse.Message)
+ }
+ tasks = append(tasks, r.Task)
+ taskIds = append(taskIds, r.Task.ID)
+ }
+
+ for i := 0; i < len(params); i++ {
+ ctb.taskID = int(tasks[i].ID)
+ //若成功启动Compute Task,开始轮询tasksGet,解析response,得到状态为完成时break。
+ var taskResp *client.TaskResponse
+ var err error
+ for attempt := 0; attempt < ctb.waitSecond; attempt++ {
+ ctb.healthCheck.DoHealthCheck()
+ taskResp, err = ctb.masterHttp.GetTask(ctb.taskID)
+ require.NoError(ctb.t, err)
+ if taskResp.Task.Status == "complete" {
+ break
+ }
+ require.NotEqual(ctb.t, "error", taskResp.Task.Status)
+ time.Sleep(1 * time.Second)
+ }
+ require.Equal(ctb.t, "complete", taskResp.Task.Status)
+ fmt.Printf("Compute Task %d completed successfully\n",
ctb.taskID)
+ }
+
+ response, err := ctb.masterHttp.GetTaskStartSequence(taskIds)
+ require.NoError(ctb.t, err)
+ sequence := response.Sequence
+ for i, id := range sequence {
+ fmt.Printf("Task %d started at position %d in the sequence\n",
id, i+1)
+ }
+ require.Equal(ctb.t, len(taskIds), len(sequence))
+
+ return taskIds, sequence
+}
+
// SendComputeReqSync
//
// @Description: 发送Http请求,无需重写,同步请求
diff --git a/vermeer/test/functional/compute_task.go
b/vermeer/test/functional/compute_task.go
index 14ee5a52..08d65fdc 100644
--- a/vermeer/test/functional/compute_task.go
+++ b/vermeer/test/functional/compute_task.go
@@ -36,6 +36,9 @@ type ComputeTask interface {
masterHttp *client.VermeerClient, t *testing.T, healthCheck
*HealthCheck)
TaskComputeBody() map[string]string
SendComputeReqAsync(params map[string]string)
+ SendComputeReqAsyncNotWait(params map[string]string) int32
+ SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32,
error)
+ SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32,
[]int32)
SendComputeReqSync(params map[string]string)
LoadComputeRes() ([]interface{}, error)
CheckRes()
diff --git a/vermeer/test/functional/http_interface.go
b/vermeer/test/functional/http_interface.go
index 2869db14..199261e8 100644
--- a/vermeer/test/functional/http_interface.go
+++ b/vermeer/test/functional/http_interface.go
@@ -76,6 +76,22 @@ func (ct CancelTask) CancelTask(t *testing.T, master
*client.VermeerClient, grap
require.Equal(t, "canceled", task.Task.Status)
}
+/*
+* @Description: DirectCancelTask cancels a task directly.
+* @Param t
+* @Param master
+* @Param taskID
+ */
+func (ct CancelTask) DirectCancelTask(t *testing.T, master
*client.VermeerClient, taskID int32) {
+ ok, err := master.GetTaskCancel(int(taskID))
+ require.NoError(t, err)
+ require.Equal(t, true, ok)
+
+ task, err := master.GetTask(int(taskID))
+ require.NoError(t, err)
+ require.Equal(t, "canceled", task.Task.Status)
+}
+
type GetGraphs struct {
}
diff --git a/vermeer/test/functional/load_local.go
b/vermeer/test/functional/load_local.go
index 8da57f04..52575bb1 100644
--- a/vermeer/test/functional/load_local.go
+++ b/vermeer/test/functional/load_local.go
@@ -21,6 +21,9 @@ package functional
import (
"math/rand"
+ "strconv"
+
+ "github.com/sirupsen/logrus"
)
type LoadTaskLocal struct {
@@ -43,3 +46,28 @@ func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
"load.vertex_backend":
vertexBackends[rand.Intn(len(vertexBackends))],
}
}
+
+// TaskLoadBodyWithNum creates load configuration with specified number of
files.
+// If num <= 10, it will be automatically adjusted to 30 to ensure minimum
test coverage.
+func (lt *LoadTaskLocal) TaskLoadBodyWithNum(num int) map[string]string {
+ vertexBackends := []string{"db", "mem"}
+
+ if num <= 10 {
+ num = 30
+ }
+
+ logrus.Infof("load with num: " + strconv.Itoa(num-1))
+
+ return map[string]string{
+ "load.parallel": "100",
+ "load.type": "local",
+ "load.use_property": "0",
+ //"load.use_outedge": "1",
+ //"load.use_out_degree": "1",
+ //"load.use_undirected": "0",
+ "load.delimiter": " ",
+ "load.vertex_files": "{\"127.0.0.1\":\"" +
"test/case/vertex/vertex_[0," + strconv.Itoa(num-1) + "]" + "\"}",
+ "load.edge_files": "{\"127.0.0.1\":\"" +
"test/case/edge/edge_[0," + strconv.Itoa(num-1) + "]" + "\"}",
+ "load.vertex_backend":
vertexBackends[rand.Intn(len(vertexBackends))],
+ }
+}
diff --git a/vermeer/test/functional/load_local.go
b/vermeer/test/scheduler/batch.go
similarity index 52%
copy from vermeer/test/functional/load_local.go
copy to vermeer/test/scheduler/batch.go
index 8da57f04..e2d6611e 100644
--- a/vermeer/test/functional/load_local.go
+++ b/vermeer/test/scheduler/batch.go
@@ -1,5 +1,3 @@
-//go:build vermeer_test
-
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
@@ -17,29 +15,27 @@ License for the specific language governing permissions and
limitations
under the License.
*/
-package functional
+package scheduler
import (
- "math/rand"
+ "testing"
+ "vermeer/client"
+ "vermeer/test/functional"
)
-type LoadTaskLocal struct {
- LoadTaskTestBase
-}
-
-func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
- vertexBackends := []string{"db", "mem"}
-
- return map[string]string{
- "load.parallel": "100",
- "load.type": "local",
- "load.use_property": "0",
- //"load.use_outedge": "1",
- //"load.use_out_degree": "1",
- //"load.use_undirected": "0",
- "load.delimiter": " ",
- "load.vertex_files": "{\"127.0.0.1\":\"" +
"test/case/vertex/vertex_[0,29]" + "\"}",
- "load.edge_files": "{\"127.0.0.1\":\"" +
"test/case/edge/edge_[0,29]" + "\"}",
- "load.vertex_backend":
vertexBackends[rand.Intn(len(vertexBackends))],
- }
+/*
+* @Description: This is the main test function for batch.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param factor
+* @Param waitSecond
+ */
+func TestBatch(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
factor string, waitSecond int) {
+ // TEST GROUP: BATCH
+ // 1. send batch tasks to single graph
+ // expect: the tasks should be executed in order of time
+ // have been tested in priority.go
}
diff --git a/vermeer/test/scheduler/priority.go
b/vermeer/test/scheduler/priority.go
new file mode 100644
index 00000000..f15da6c9
--- /dev/null
+++ b/vermeer/test/scheduler/priority.go
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "vermeer/apps/structure"
+ "vermeer/client"
+ "vermeer/test/functional"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+)
+
+/*
+* @Description: SubTestPriority tests the scheduler's behavior when submitting
tasks with different priorities.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestPriority(t *testing.T, expectRes *functional.ExpectRes,
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient,
graphName []string, computeTask string, waitSecond int) {
+ fmt.Printf("Test Priority start with task: %s\n", computeTask)
+ bTime := time.Now()
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBody := computeTest.TaskComputeBody()
+
+ // send two tasks with different priority
+ params := make([]map[string]string, 0)
+
+ for i := 0; i < 2; i++ {
+ param := make(map[string]string)
+ param["priority"] = fmt.Sprintf("%d", i)
+ for k, v := range taskComputeBody {
+ param[k] = v
+ }
+ params = append(params, param)
+ }
+
+ logrus.Infof("params for priority test: %+v", params)
+
+ taskids, sequence :=
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests
asynchronously with priority
+
+ require.Equal(t, 2, len(sequence))
+ for i := 0; i < 2; i++ {
+ require.Equal(t, taskids[1-i], sequence[i]) // expect task with
priority 1 executed before priority 0
+ }
+
+ computeTest.CheckRes()
+ fmt.Printf("Test Priority: %-30s [OK], cost: %v\n", computeTask,
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestSmall tests the scheduler's behavior when submitting
tasks with different sizes.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestSmall(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
computeTask string, waitSecond int) {
+ fmt.Printf("Test Small start with task: %s\n", computeTask)
+ bTime := time.Now()
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ computeTaskSmall, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBody := computeTest.TaskComputeBody()
+ computeTaskSmall.Init(graphName[1], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBodySmall := computeTaskSmall.TaskComputeBody()
+
+ // send two tasks with different size
+ params := make([]map[string]string, 0)
+ taskComputeBody["graph_name"] = graphName[0]
+ taskComputeBodySmall["graph_name"] = graphName[1]
+ params = append(params, taskComputeBody)
+ params = append(params, taskComputeBodySmall)
+
+ logrus.Infof("params for small test: %+v", params)
+
+ taskids, sequence :=
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests
asynchronously with priority
+
+ require.Equal(t, 2, len(sequence))
+ for i := 0; i < 2; i++ {
+ require.Equal(t, taskids[1-i], sequence[i]) // expect task
smaller executed before larger
+ }
+
+ computeTest.CheckRes()
+ fmt.Printf("Test Small: %-30s [OK], cost: %v\n", computeTask,
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestConcurrent tests the scheduler's behavior when
submitting tasks with different sizes.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestConcurrent(t *testing.T, expectRes *functional.ExpectRes,
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient,
graphName []string, computeTask string, waitSecond int) {
+ fmt.Printf("Test Concurrent start with task: %s\n", computeTask)
+ bTime := time.Now()
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[1], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBody := computeTest.TaskComputeBody()
+
+ // send two tasks with different size
+ params := make([]map[string]string, 0)
+ // default is false, actually do not need to set
+ taskComputeBody["exclusive"] = "false"
+ params = append(params, taskComputeBody)
+ params = append(params, taskComputeBody)
+
+ logrus.Infof("params for concurrent test: %+v", params)
+
+ _, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) //
send multiple requests asynchronously with priority
+
+ require.Equal(t, 2, len(sequence))
+
+ fmt.Printf("Test Concurrent: %-30s [OK], cost: %v\n", computeTask,
time.Since(bTime))
+ // cost should be less than 2 * single task time
+}
+
+/*
+* @Description: SubTestDepends tests the scheduler's behavior when submitting
tasks with different dependencies.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestDepends(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
computeTask string, waitSecond int) {
+ fmt.Printf("Test Depends start with task: %s\n", computeTask)
+ bTime := time.Now()
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBody := computeTest.TaskComputeBody()
+
+ // first alloc worker 4 for graph 3
+ masterHttp.AllocGroupGraph(graphName[0]+"_3", "test")
+
+ loadTest3 := functional.LoadTaskLocal{}
+ loadTest3.Init(graphName[0]+"_3", expectRes, masterHttp, waitSecond, t,
healthCheck)
+ loadTest3.SendLoadRequest(loadTest3.TaskLoadBodyWithNum(10))
+
+ // send a large task to $ worker group
+ taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody)
+
+ // send two tasks with different dependency to the same graph
+ taskComputeBody["graph_name"] = graphName[0] + "_3"
+ params := make([]map[string]string, 0)
+ new_body := make(map[string]string)
+ for k, v := range taskComputeBody {
+ new_body[k] = v
+ }
+ new_body["preorders"] = fmt.Sprintf("%d", taskid)
+ params = append(params, new_body)
+ params = append(params, taskComputeBody)
+
+ logrus.Infof("params for depends test: %+v", params)
+
+ taskids, sequence :=
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests
asynchronously with priority
+
+ require.Equal(t, 2, len(sequence))
+ for i := 0; i < 2; i++ {
+ require.Equal(t, taskids[1-i], sequence[i]) // expect task not
depend executed first
+ }
+
+ // computeTest.CheckRes()
+ fmt.Printf("Test Depends: %-30s [OK], cost: %v\n", computeTask,
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestInvalidDependency tests the scheduler's behavior when a
compute task is submitted with a dependency on a non-existent (invalid) task ID.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestInvalidDependency(t *testing.T, expectRes *functional.ExpectRes,
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient,
graphName []string, computeTask string, waitSecond int) {
+ fmt.Printf("Test Invalid Dependency start with task: %s\n", computeTask)
+ bTime := time.Now()
+
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+
+ taskBody := computeTest.TaskComputeBody()
+ // set preorders to a very large, theoretically nonexistent task ID
+ invalidTaskID := 999999999
+ taskBody["preorders"] = fmt.Sprintf("%d", invalidTaskID)
+
+ logrus.Infof("Attempting to submit a task with invalid dependency on
ID: %d", invalidTaskID)
+
+ // try to submit task asynchronously and check if it returns an error
+ taskID, err := computeTest.SendComputeReqAsyncNotWaitWithError(taskBody)
+
+ // assert that the submission operation failed
+ require.Error(t, err, "Submitting a task with a non-existent dependency
should return an error.")
+ // assert that the returned task ID is 0 or other failed values
+ require.Equal(t, int32(-1), taskID, "The task ID should be zero or
invalid on failure.")
+
+ fmt.Printf("Test Invalid Dependency: %-30s [OK], cost: %v\n",
computeTask, time.Since(bTime))
+}
+
+/*
+* @Description: SubTestConcurrentCancellation tests the scheduler's behavior
when submitting tasks concurrently and canceling them.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestConcurrentCancellation(t *testing.T, expectRes
*functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp
*client.VermeerClient, graphName []string, computeTask string, waitSecond int) {
+ fmt.Printf("Test Concurrent Cancellation start with task: %s\n",
computeTask)
+ bTime := time.Now()
+
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+
+ // set task number
+ const numTasks = 20
+ taskBodies := make([]map[string]string, numTasks)
+ for i := 0; i < numTasks; i++ {
+ taskBodies[i] = computeTest.TaskComputeBody()
+ }
+
+ taskIDs := make(chan int32, numTasks)
+ var wg sync.WaitGroup
+
+ // 1. submit tasks concurrently
+ for i := 0; i < numTasks; i++ {
+ wg.Add(1)
+ go func(body map[string]string) {
+ defer wg.Done()
+ taskID := computeTest.SendComputeReqAsyncNotWait(body)
+ if taskID != 0 {
+ taskIDs <- taskID
+ } else {
+ logrus.Errorf("Failed to submit task: %v", err)
+ }
+ }(taskBodies[i])
+ }
+
+ wg.Wait()
+ close(taskIDs)
+
+ submittedTaskIDs := make([]int32, 0, numTasks)
+ for id := range taskIDs {
+ submittedTaskIDs = append(submittedTaskIDs, id)
+ }
+
+ logrus.Infof("Submitted %d tasks concurrently: %+v",
len(submittedTaskIDs), submittedTaskIDs)
+ require.Equal(t, numTasks, len(submittedTaskIDs), "Not all tasks were
successfully submitted.")
+
+ cancelTask := functional.CancelTask{}
+ cancelTask.DirectCancelTask(t, masterHttp,
submittedTaskIDs[len(submittedTaskIDs)-1])
+
+ // 3. verify task status
+ // wait for tasks to settle
+ logrus.Info("Waiting for tasks to settle...")
+ time.Sleep(time.Duration(waitSecond) * time.Second)
+
+ checkTask, err := masterHttp.GetTask(int(submittedTaskIDs[numTasks-1]))
+
+ require.NoError(t, err, "Error fetching task status after
cancellation.")
+ require.NotNil(t, checkTask, "Task should exist after cancellation.")
+
+ if structure.TaskState(checkTask.Task.Status) !=
structure.TaskStateCanceled {
+ logrus.Warn("No tasks were cancelled; check scheduler
behavior.")
+ require.Fail(t, "Expected at least some tasks to be cancelled.")
+ }
+
+ fmt.Printf("Test Concurrent Cancellation: %-30s [OK], cost: %v\n",
computeTask, time.Since(bTime))
+}
+
+/*
+* @Description: This is the main test function for priority.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
factor string, waitSecond int) {
+ fmt.Print("start test priority\n")
+
+ // for scheduler, just test a simple task
+ var computeTask = "pagerank"
+
+ // TEST GROUP: PRIORITY
+ // 1. send priority tasks to single graph
+ // expect: the tasks should be executed in order of priority
+
+ SubTestPriority(t, expectRes, healthCheck, masterHttp, graphName,
computeTask, waitSecond)
+
+ // 2. send small tasks and large tasks to single graph
+ // expect: the small tasks should be executed first
+
+ SubTestSmall(t, expectRes, healthCheck, masterHttp, graphName,
computeTask, waitSecond)
+
+ // 3. send support concurrent tasks to single graph
+ // expect: the tasks should be executed concurrently
+ SubTestConcurrent(t, expectRes, healthCheck, masterHttp, graphName,
computeTask, waitSecond)
+
+ // 4. send dependency-tasks to single graph
+ // expect: the tasks should be executed in order of dependency
+
+ SubTestDepends(t, expectRes, healthCheck, masterHttp, graphName,
computeTask, waitSecond)
+
+ // 5. send same priority tasks to single graph
+ // expect: the tasks should be executed in order of time
+ // skipped, too fragile
+
+ // 6. send tasks to different graphs
+ // expect: the tasks should be executed concurrently
+ // have been tested in SubTestSmall and SubTestDepends
+
+ // 7. send tasks with invalid dependency to single graph
+ // expect: the tasks should not be executed
+ SubTestInvalidDependency(t, expectRes, healthCheck, masterHttp,
graphName, computeTask, waitSecond)
+
+ // 8. send tasks concurrently and cancel them
+ // expect: the tasks should be cancelled
+ SubTestConcurrentCancellation(t, expectRes, healthCheck, masterHttp,
graphName, computeTask, 3)
+}
diff --git a/vermeer/test/scheduler/routine.go
b/vermeer/test/scheduler/routine.go
new file mode 100644
index 00000000..e5ced6bb
--- /dev/null
+++ b/vermeer/test/scheduler/routine.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+ "fmt"
+ "testing"
+ "time"
+ "vermeer/client"
+ "vermeer/test/functional"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+)
+
+/*
+* @Description: SubTestRoutine tests the scheduler's behavior when submitting
tasks with cron expression.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
computeTask string, waitSecond int) {
+ fmt.Printf("Test Routine start with task: %s\n", computeTask)
+ bTime := time.Now()
+ computeTest, err := functional.MakeComputeTask(computeTask)
+ require.NoError(t, err)
+ computeTest.Init(graphName[0], computeTask, expectRes, waitSecond,
masterHttp, t, healthCheck)
+ taskComputeBody := computeTest.TaskComputeBody()
+
+ // every 1 minute
+ taskComputeBody["cron_expr"] = "* * * * *"
+
+ logrus.Infof("params for routine test: %+v", taskComputeBody)
+
+ taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody)
+ // computeTest.CheckRes()
+
+ // wait for a while and check again
+ time.Sleep(2 * time.Minute)
+
+ // check if deployed
+ queue := []int32{}
+ queue = append(queue, int32(taskid+1))
+ result, err := masterHttp.GetTaskStartSequence(queue)
+ require.NoError(t, err)
+ require.Equal(t, 1, len(result.Sequence))
+ require.Greater(t, result.Sequence[0], int32(0))
+
+ masterHttp.GetTaskCancel(int(taskid))
+
+ fmt.Printf("Test Routine: %-30s [OK], cost: %v\n", computeTask,
time.Since(bTime))
+}
+
+func TestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string,
factor string, waitSecond int) {
+ var computeTask = "pagerank"
+
+ // TEST GROUP: ROUTINE
+ // 1. send tasks to single graph
+ // expect: the tasks should be executed timely
+
+ SubTestRoutine(t, expectRes, healthCheck, masterHttp, graphName,
computeTask, waitSecond)
+}
diff --git a/vermeer/test/scheduler/test_scheduler.go
b/vermeer/test/scheduler/test_scheduler.go
new file mode 100644
index 00000000..7d0274cb
--- /dev/null
+++ b/vermeer/test/scheduler/test_scheduler.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+ "fmt"
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "vermeer/client"
+ "vermeer/test/functional"
+)
+
+/*
+* @Description: This is the main test function for scheduler.
+* @Param t
+* @Param expectResPath
+* @Param masterHttpAddr
+* @Param graphName
+* @Param factor
+* @Param waitSecond
+* @Note: You must start at least two worker, named worker01 and worker04 in
your config.yaml
+ */
+func TestScheduler(t *testing.T, expectResPath string, masterHttpAddr string,
graphName string, factor string, waitSecond int) {
+ fmt.Print("start test scheduler\n")
+
+ startTime := time.Now()
+ expectRes, err := functional.GetExpectRes(expectResPath)
+ require.NoError(t, err)
+
+ masterHttp := client.VermeerClient{}
+ masterHttp.Init("http://"+masterHttpAddr, http.DefaultClient)
+
+ // health check
+ healthCheck := functional.HealthCheck{}
+ healthCheck.Init(t, &masterHttp)
+ healthCheck.DoHealthCheck()
+
+ // load graph first
+ loadTest1 := functional.LoadTaskLocal{}
+ loadTest1.Init(graphName+"_1", expectRes, &masterHttp, waitSecond, t,
&healthCheck)
+ loadTest1.SendLoadRequest(loadTest1.TaskLoadBodyWithNum(0))
+ loadTest1.CheckGraph()
+
+ loadTest2 := functional.LoadTaskLocal{}
+ loadTest2.Init(graphName+"_2", expectRes, &masterHttp, waitSecond, t,
&healthCheck)
+ loadTest2.SendLoadRequest(loadTest2.TaskLoadBodyWithNum(20))
+ // loadTest2.CheckGraph()
+
+ TestPriority(t, expectRes, &healthCheck, &masterHttp,
[]string{graphName + "_1", graphName + "_2"}, factor, waitSecond)
+
+ TestBatch(t, expectRes, &healthCheck, &masterHttp, []string{graphName +
"_1"}, factor, waitSecond)
+
+ TestRoutine(t, expectRes, &healthCheck, &masterHttp, []string{graphName
+ "_2"}, factor, waitSecond)
+
+ // Error handling: cancel task
+ cancelTask := functional.CancelTask{}
+ cancelTask.CancelTask(t, &masterHttp, graphName+"_1")
+ cancelTask.CancelTask(t, &masterHttp, graphName+"_2")
+ fmt.Print("test cancel task [OK]\n")
+
+ // Finally, delete graph
+ deleteGraph := functional.DeleteGraph{}
+ deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_1")
+ deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_2")
+ fmt.Print("test delete graph [OK]\n")
+
+ fmt.Printf("client test finished, cost time:%v\n",
time.Since(startTime))
+}
diff --git a/vermeer/vermeer_test.go b/vermeer/vermeer_test.go
index 4dde004d..fcc93272 100644
--- a/vermeer/vermeer_test.go
+++ b/vermeer/vermeer_test.go
@@ -31,6 +31,7 @@ import (
"vermeer/client"
"vermeer/test/functional"
+ "vermeer/test/scheduler"
)
var (
@@ -95,6 +96,8 @@ func TestVermeer(t *testing.T) {
t.Run("algorithms", testAlgorithms)
case "function":
t.Run("function", testFunction)
+ case "scheduler":
+ t.Run("scheduler", testScheduler)
}
}
@@ -102,10 +105,15 @@ func testFunction(t *testing.T) {
functional.TestFunction(t, expectResPath, masterHttpAddr, graphName,
factor, waitSecond)
}
+func testScheduler(t *testing.T) {
+ scheduler.TestScheduler(t, expectResPath, masterHttpAddr, graphName,
factor, waitSecond)
+}
+
func testAlgorithms(t *testing.T) {
// todo: 增加算法名称
var computeTasks = []string{"pagerank", "lpa", "wcc", "degree_out",
"degree_in", "degree_both", "triangle_count",
"sssp", "closeness_centrality", "betweenness_centrality",
"kcore", "jaccard", "ppr", "clustering_coefficient", "scc", "louvain"}
+ // var computeTasks = []string{"pagerank"}
startTime := time.Now()
expectRes, err := functional.GetExpectRes(expectResPath)
@@ -158,6 +166,7 @@ func testAlgorithms(t *testing.T) {
taskComputeBody["output.need_query"] = needQuery
if sendType == "async" {
computeTest.SendComputeReqAsync(taskComputeBody)
+ // computeTest.SendComputeReqAsyncBatchPriority(10,
taskComputeBody) // 异步发送多个请求
} else {
computeTest.SendComputeReqSync(taskComputeBody)
}