This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new 03978808 feat(vermeer): support task priority based scheduling (#336)
03978808 is described below

commit 039788087ff4d3d05ad0a4b9cc05789d61af3bfb
Author: Jingkai Yang <[email protected]>
AuthorDate: Mon Oct 27 16:35:23 2025 +0800

    feat(vermeer): support task priority based scheduling (#336)
    
    * feat(Scheduler): Support priority/elder/depends based scheduling.
    
    * chore: some tiny error
    
    * Update vermeer/.gitignore
    
    * Update CodeQL workflow to use latest actions
    
    ---------
    
    Co-authored-by: imbajin <[email protected]>
---
 .github/workflows/codeql-analysis.yml              |  14 +-
 vermeer/.gitignore                                 |   4 +
 vermeer/README.md                                  |  22 +
 vermeer/README.zh-CN.md                            |  20 +
 vermeer/apps/graphio/local_file.go                 |   1 +
 vermeer/apps/master/bl/compute_task.go             |   2 +
 vermeer/apps/master/bl/grpc_handlers.go            |   6 +
 vermeer/apps/master/bl/load_task.go                |   3 +
 vermeer/apps/master/bl/scheduler_bl.go             | 490 ++++++++++++++-----
 vermeer/apps/master/bl/task_bl.go                  |  61 ++-
 vermeer/apps/master/bl/task_creator.go             |  22 +
 vermeer/apps/master/bl/worker_bl.go                |   2 +-
 vermeer/apps/master/master_main.go                 |   1 +
 vermeer/apps/master/schedules/broker.go            |  40 +-
 .../schedules/scheduler_algorithm_manager.go       | 524 +++++++++++++++++++++
 .../master/schedules/scheduler_cron_manager.go     | 161 +++++++
 .../master/schedules/scheduler_resource_manager.go | 258 ++++++++++
 .../master/schedules/scheduler_task_manager.go     | 291 ++++++++++++
 vermeer/apps/master/services/http_tasks.go         |  32 +-
 vermeer/apps/master/services/router.go             |   7 +-
 vermeer/apps/master/workers/worker_manager.go      |   4 +
 vermeer/apps/structure/task.go                     |   6 +
 vermeer/client/client.go                           |  70 +++
 vermeer/client/structure.go                        |  13 +
 vermeer/config/master.ini                          |   1 +
 vermeer/config/{master.ini => worker04.ini}        |  12 +-
 vermeer/docker-compose.yaml                        |  46 ++
 vermeer/go.mod                                     |   1 +
 vermeer/go.sum                                     |   2 +
 vermeer/test/functional/compute_base.go            |  97 ++++
 vermeer/test/functional/compute_task.go            |   3 +
 vermeer/test/functional/http_interface.go          |  16 +
 vermeer/test/functional/load_local.go              |  28 ++
 .../load_local.go => scheduler/batch.go}           |  42 +-
 vermeer/test/scheduler/priority.go                 | 366 ++++++++++++++
 vermeer/test/scheduler/routine.go                  |  81 ++++
 vermeer/test/scheduler/test_scheduler.go           |  87 ++++
 vermeer/vermeer_test.go                            |   9 +
 38 files changed, 2667 insertions(+), 178 deletions(-)

diff --git a/.github/workflows/codeql-analysis.yml 
b/.github/workflows/codeql-analysis.yml
index 779c4406..0539d2fe 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -32,10 +32,10 @@ jobs:
 
     steps:
       - name: Checkout repository
-        uses: actions/checkout@v4
+        uses: actions/checkout@v5
 
       - name: Setup Java JDK
-        uses: actions/setup-java@v3
+        uses: actions/setup-java@v4
         with:
           distribution: 'zulu'
           java-version: '11'
@@ -48,7 +48,7 @@ jobs:
 
       # Initializes the CodeQL tools for scanning.
       - name: Initialize CodeQL
-        uses: github/codeql-action/init@v2
+        uses: github/codeql-action/init@v3
         with:
           languages: ${{ matrix.language }}
           # If you wish to specify custom queries, you can do so here or in a 
config file.
@@ -59,7 +59,7 @@ jobs:
       # Autobuild attempts to build any compiled languages (C/C++, C#, or 
Java).
       # If this step fails, then you should remove it and run the build 
manually (see below)
       - name: Autobuild
-        uses: github/codeql-action/autobuild@v2
+        uses: github/codeql-action/autobuild@v3
 
       # ℹ️ Command-line programs to run using the OS shell.
       # 📚 https://git.io/JvXDl
@@ -73,13 +73,13 @@ jobs:
       #   make release
 
       - name: Perform CodeQL Analysis
-        uses: github/codeql-action/analyze@v2
+        uses: github/codeql-action/analyze@v3
 
   dependency-review:
     runs-on: ubuntu-latest
     steps:
       - name: 'Checkout Repository'
-        uses: actions/checkout@v4
+        uses: actions/checkout@v5
       - name: 'Dependency Review'
-        uses: actions/dependency-review-action@v3
+        uses: actions/dependency-review-action@v4
         
diff --git a/vermeer/.gitignore b/vermeer/.gitignore
index 540a67ae..53629789 100644
--- a/vermeer/.gitignore
+++ b/vermeer/.gitignore
@@ -83,3 +83,7 @@ node_modules/
 /output/
 /bin/*
 !/bin/*.sh
+
+# Others #
+######################
+test/case/
diff --git a/vermeer/README.md b/vermeer/README.md
index 77695662..55ca14b0 100644
--- a/vermeer/README.md
+++ b/vermeer/README.md
@@ -3,6 +3,28 @@
 ## Introduction
 Vermeer is a high-performance distributed graph computing platform based on 
memory, supporting more than 15 graph algorithms, custom algorithm extensions, 
and custom data source access.
 
+## Run with Docker
+
+Pull the image:
+```
+docker pull hugegraph/vermeer:latest
+```
+
+Create local configuration files, for example, `~/master.ini` and 
`~/worker.ini`.
+
+Run with Docker. The `--env` flag specifies the file name.
+
+```
+master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master
+worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker
+```
+
+We've also provided a `docker-compose` file. Once you've created 
`~/master.ini` and `~/worker.ini`, and updated the `master_peer` in 
`worker.ini` to `172.20.0.10:6689`, you can run it using the following command:
+
+```
+docker-compose up -d
+```
+
 ## Start
 
 ```
diff --git a/vermeer/README.zh-CN.md b/vermeer/README.zh-CN.md
index 34dcec04..1b125fa3 100644
--- a/vermeer/README.zh-CN.md
+++ b/vermeer/README.zh-CN.md
@@ -3,6 +3,26 @@
 ## 简介
 Vermeer是一个基于内存的高性能分布式图计算平台,支持15+图算法。支持自定义算法扩展,支持自定义数据源接入。
 
+## 基于 Docker 运行
+
+拉取镜像
+```
+docker pull hugegraph/vermeer:latest
+```
+
+创建好本地配置文件,例如`~/master.ini`与`~/worker.ini`
+
+基于docker运行,其中`--env`指定的是文件名称。
+```
+master: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=master
+worker: docker run -v ~/:/go/bin/config hugegraph/vermeer --env=worker
+```
+
+我们也提供了`docker-compose`文件,当创建好`~/master.ini`与`~/worker.ini`,将`worker.ini`中的`master_peer`修改为`172.20.0.10:6689`后,即可通过以下命令运行:
+```
+docker-compose up -d
+```
+
 ## 运行
 
 ```
diff --git a/vermeer/apps/graphio/local_file.go 
b/vermeer/apps/graphio/local_file.go
index 8cdb7262..7d611930 100644
--- a/vermeer/apps/graphio/local_file.go
+++ b/vermeer/apps/graphio/local_file.go
@@ -82,6 +82,7 @@ func (a *LocalMaker) MakeTasks(params map[string]string, 
taskID int32) ([]LoadPa
                        logrus.Errorf(s)
                        return nil, errors.New(s)
                }
+               logrus.Debugf("MakeTask LoadTypeLocal parse file: %s, s:%d, 
e:%d", files, s, e)
                for i := s; i <= e; i++ {
                        part := LoadPartition{}
                        part.Init(partID, taskID, LoadPartTypeVertex)
diff --git a/vermeer/apps/master/bl/compute_task.go 
b/vermeer/apps/master/bl/compute_task.go
index 80ca8052..e8abef04 100644
--- a/vermeer/apps/master/bl/compute_task.go
+++ b/vermeer/apps/master/bl/compute_task.go
@@ -142,6 +142,8 @@ func (ctb *ComputeTaskBl) ComputeTaskStatus(
                                }
                        }
                        taskMgr.ForceState(computeTask.Task, 
structure.TaskStateComplete)
+                       // for scheduler, mark task complete
+                       Scheduler.taskManager.MarkTaskComplete(taskId)
                        graph.SubUsingNum()
                        computeTask.FreeMemory()
                        needQuery := options.GetInt(computeTask.Task.Params, 
"output.need_query") == 1
diff --git a/vermeer/apps/master/bl/grpc_handlers.go 
b/vermeer/apps/master/bl/grpc_handlers.go
index e1c23558..2734c063 100644
--- a/vermeer/apps/master/bl/grpc_handlers.go
+++ b/vermeer/apps/master/bl/grpc_handlers.go
@@ -26,6 +26,7 @@ import (
        "time"
        "vermeer/apps/compute"
        "vermeer/apps/graphio"
+       "vermeer/apps/master/schedules"
        "vermeer/apps/master/threshold"
        "vermeer/apps/master/workers"
        pb "vermeer/apps/protos"
@@ -103,6 +104,11 @@ func (h *ServerHandler) SayHelloMaster(ctx 
context.Context, req *pb.HelloMasterR
                logrus.Errorf("failed to add a WorkerClient to the 
WorkerManager, error: %s", err)
                return &pb.HelloMasterResp{}, err
        }
+       _, err = Scheduler.ChangeWorkerStatus(reqWorker.Name, 
schedules.WorkerOngoingStatusIdle)
+       if err != nil {
+               logrus.Errorf("failed to change worker status to idle, error: 
%s", err)
+               return &pb.HelloMasterResp{}, err
+       }
 
        logrus.Infof("worker say hello name: %s and set to workgroup: %s, 
client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
 
diff --git a/vermeer/apps/master/bl/load_task.go 
b/vermeer/apps/master/bl/load_task.go
index 4e1f79f4..80c0023b 100644
--- a/vermeer/apps/master/bl/load_task.go
+++ b/vermeer/apps/master/bl/load_task.go
@@ -204,6 +204,9 @@ func (lb *LoadTaskBl) LoadTaskStatus(taskId int32, state 
string, workerName stri
                        loadTask.Task.SetState(structure.TaskStateLoaded)
                        //TaskMgr.ForceState(loadTask.Task, 
structure.TaskStateLoaded)
 
+                       // for scheduler, mark task complete
+                       Scheduler.taskManager.MarkTaskComplete(taskId)
+
                        logrus.Infof("graph: %s, vertex: %d, edge: %d", 
graph.Name, graph.VertexCount, graph.EdgeCount)
                        for _, w := range graph.Workers {
                                logrus.Infof(
diff --git a/vermeer/apps/master/bl/scheduler_bl.go 
b/vermeer/apps/master/bl/scheduler_bl.go
index bc4ccac4..95578949 100644
--- a/vermeer/apps/master/bl/scheduler_bl.go
+++ b/vermeer/apps/master/bl/scheduler_bl.go
@@ -19,6 +19,7 @@ package bl
 
 import (
        "errors"
+       "fmt"
        "strconv"
        "time"
        "vermeer/apps/common"
@@ -28,16 +29,58 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+/*
+* @Description: ScheduleBl is the scheduler business logic.
+* @Note: This is the main scheduler business logic.
+ */
 type ScheduleBl struct {
        structure.MutexLocker
-       dispatchLocker   structure.MutexLocker
-       spaceQueue       *schedules.SpaceQueue
-       broker           *schedules.Broker
-       startChan        chan *structure.TaskInfo
-       isDispatchPaused bool
+       // resource management
+       resourceManager *schedules.SchedulerResourceManager
+       // algorithm management
+       algorithmManager *schedules.SchedulerAlgorithmManager
+       // task management
+       taskManager *schedules.SchedulerTaskManager
+       // cron management
+       cronManager *schedules.SchedulerCronManager
+       // start channel for tasks to be started
+       startChan chan *structure.TaskInfo
+       // configurations
+       startChanSize  int
+       tickerInterval int
+       softSchedule   bool
 }
 
+/*
+* @Description: Init initializes the ScheduleBl.
+* @Note: This function will initialize the ScheduleBl.
+ */
 func (s *ScheduleBl) Init() {
+       logrus.Info("Initializing ScheduleBl...")
+       s.LoadConfig()
+       startChan := make(chan *structure.TaskInfo, s.startChanSize)
+       s.startChan = startChan
+
+       s.resourceManager = &schedules.SchedulerResourceManager{}
+       s.resourceManager.Init()
+       s.taskManager = &schedules.SchedulerTaskManager{}
+       s.taskManager.Init()
+       s.algorithmManager = &schedules.SchedulerAlgorithmManager{}
+       s.algorithmManager.Init()
+       s.cronManager = &schedules.SchedulerCronManager{}
+       s.cronManager.Init(s.QueueTaskFromTemplate)
+       go s.startTicker()
+       go s.waitingStartedTask()
+}
+
+/*
+* @Description: LoadConfig loads the configuration from the common package.
+* @Note: This function will load the configuration from the common package.
+ */
+func (s *ScheduleBl) LoadConfig() {
+       // Load configuration from common package
+
+       // startChanSize
        const defaultChanSizeConfig = "10"
        chanSize := common.GetConfigDefault("start_chan_size", 
defaultChanSizeConfig).(string)
        // Convert string to int
@@ -47,22 +90,135 @@ func (s *ScheduleBl) Init() {
                logrus.Infof("using default start_chan_size: %s", 
defaultChanSizeConfig)
                chanSizeInt, _ = strconv.Atoi(defaultChanSizeConfig)
        }
-       startChan := make(chan *structure.TaskInfo, chanSizeInt)
-       s.startChan = startChan
-       s.spaceQueue = (&schedules.SpaceQueue{}).Init()
-       s.broker = (&schedules.Broker{}).Init()
+       s.startChanSize = chanSizeInt
 
-       go s.waitingTask()
-       go s.startTicker()
+       // tickerInterval
+       const defaultTickerInterval = "3"
+       tickerInterval := common.GetConfigDefault("ticker_interval", 
defaultTickerInterval).(string)
+       tickerIntervalInt, err := strconv.Atoi(tickerInterval)
+       if err != nil {
+               logrus.Errorf("failed to convert ticker_interval to int: %v", 
err)
+               logrus.Infof("using default ticker_interval: %s", 
defaultTickerInterval)
+               tickerIntervalInt, _ = strconv.Atoi(defaultTickerInterval)
+       }
+       s.tickerInterval = tickerIntervalInt
+
+       // softSchedule
+       softSchedule := common.GetConfigDefault("soft_schedule", 
"true").(string)
+       if softSchedule == "true" {
+               s.softSchedule = true
+       } else {
+               s.softSchedule = false
+       }
+
+       logrus.Infof("ScheduleBl configuration: startChanSize=%d, 
tickerInterval=%d, softSchedule=%v",
+               s.startChanSize, s.tickerInterval, s.softSchedule)
 }
 
-func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
-       return s.spaceQueue.PeekTailTask(space)
+/*
+* @Description: startTicker starts the ticker.
+* @Note: This function will start the ticker.
+ */
+func (s *ScheduleBl) startTicker() {
+       // Create a ticker with the specified interval
+       ticker := time.Tick(time.Duration(s.tickerInterval) * time.Second)
+
+       for range ticker {
+               logrus.Debug("Ticker ticked")
+               s.TryScheduleNextTasks()
+       }
+}
+
+// this make scheduler manager try to schedule next tasks
+/*
+* @Description: TryScheduleNextTasks tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param noLock
+ */
+func (s *ScheduleBl) TryScheduleNextTasks(noLock ...bool) {
+       defer func() {
+               if err := recover(); err != nil {
+                       logrus.Errorln("TryScheduleNextTasks() has been 
recovered:", err)
+               }
+       }()
+
+       if err := s.tryScheduleInner(s.softSchedule, noLock...); err != nil {
+               logrus.Errorf("do scheduling error:%v", err)
+       }
+}
+
+// Main routine to schedule tasks
+/*
+* @Description: tryScheduleInner tries to schedule the next tasks.
+* @Note: This function will try to schedule the next tasks.
+* @Param softSchedule
+* @Param noLock
+ */
+func (s *ScheduleBl) tryScheduleInner(softSchedule bool, noLock ...bool) error 
{
+       // Implement logic to get the next task in the queue for the given space
+       if !(len(noLock) > 0 && noLock[0]) {
+               defer s.Unlock(s.Lock())
+       }
+
+       // step 1: make sure all tasks have alloc to a worker group
+       // This is done by the TaskManager, which assigns a worker group to 
each task
+       s.taskManager.RefreshTaskToWorkerGroupMap()
+
+       // step 2: get available resources and tasks
+       logrus.Debugf("scheduling next tasks, softSchedule: %v", softSchedule)
+       idleWorkerGroups := s.resourceManager.GetIdleWorkerGroups()
+       concurrentWorkerGroups := s.resourceManager.GetConcurrentWorkerGroups()
+       allTasks := s.taskManager.GetAllTasksNotComplete()
+       if len(allTasks) == 0 || (len(idleWorkerGroups) == 0 && 
len(concurrentWorkerGroups) == 0) {
+               logrus.Debugf("no available tasks or workerGroups, allTasks: 
%d, workerGroups: %d/%d",
+                       len(allTasks), len(idleWorkerGroups), 
len(concurrentWorkerGroups))
+               return nil
+       }
+       logrus.Debugf("all tasks: %d, workerGroups: %d/%d", len(allTasks), 
len(idleWorkerGroups), len(concurrentWorkerGroups))
+
+       // TODO: NEED TO JUDGE IF THE TASK CAN CONCURRENTLY RUNNING
+       // NOT only by user setting, but also by scheduler setting
+
+       // step 3: return the task with the highest priority or small tasks 
which can be executed immediately
+       taskToWorkerGroupMap := s.taskManager.GetTaskToWorkerGroupMap()
+       nextTasks, err := s.algorithmManager.ScheduleNextTasks(allTasks, 
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+       if err != nil {
+               logrus.Errorf("failed to schedule next tasks: %v", err)
+               return err
+       }
+       logrus.Debugf("scheduled %d tasks", len(nextTasks))
+       // step 4: send to start channel
+       for _, task := range nextTasks {
+               if task == nil {
+                       logrus.Warnf("received a nil task from algorithm 
manager")
+                       continue
+               }
+               if task.State != structure.TaskStateWaiting {
+                       logrus.Warnf("task '%d' is not in waiting state, 
current state: %s", task.ID, task.State)
+                       continue
+               }
+               logrus.Infof("scheduling task '%d' with type '%s' to start 
channel", task.ID, task.Type)
+               select {
+               case s.startChan <- task:
+                       logrus.Infof("task '%d' sent to start channel", task.ID)
+               default:
+                       errMsg := fmt.Sprintf("start channel is full, cannot 
schedule task %d", task.ID)
+                       logrus.Errorf(errMsg)
+                       taskMgr.SetError(task, errMsg)
+               }
+       }
+
+       return nil
 }
 
 // QueueTask Add the task to the inner queue.
-// The tasks will be executed in order from the queue.
 // If the task exists, return false.
+/*
+* @Description: QueueTask queues the task.
+* @Note: This function will queue the task.
+* @Param taskInfo
+* @Return bool, error
+ */
 func (s *ScheduleBl) QueueTask(taskInfo *structure.TaskInfo) (bool, error) {
        if taskInfo == nil {
                return false, errors.New("the argument `taskInfo` is nil")
@@ -72,81 +228,161 @@ func (s *ScheduleBl) QueueTask(taskInfo 
*structure.TaskInfo) (bool, error) {
                return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
        }
 
-       //defer s.Unlock(s.Lock())
+       defer s.Unlock(s.Lock())
        if err := taskMgr.SetState(taskInfo, structure.TaskStateWaiting); err 
!= nil {
                return false, err
        }
 
+       logrus.Debugf("queuing task %d with parameters: %+v", taskInfo.ID, 
taskInfo)
+
+       // check dependency if exists
+       if len(taskInfo.Preorders) > 0 {
+               for _, depTaskID := range taskInfo.Preorders {
+                       depTask := taskMgr.GetTaskByID(depTaskID)
+                       if depTask == nil {
+                               err := errors.New("the dependency task with ID 
" + strconv.Itoa(int(depTaskID)) + " does not exist")
+                               logrus.Error(err)
+                               taskMgr.SetError(taskInfo, err.Error())
+                               return false, err
+                       }
+               }
+       }
+
        // Notice: Ensure successful invocation.
-       ok, err := s.spaceQueue.PushTask(taskInfo)
+       // make sure all tasks have alloc to a worker group
+       ok, err := s.taskManager.QueueTask(taskInfo)
        if err != nil {
                taskMgr.SetError(taskInfo, err.Error())
                return ok, err
        }
 
-       go s.dispatch()
+       if s.cronManager.CheckCronExpression(taskInfo.CronExpr) == nil {
+               if err := s.cronManager.AddCronTask(taskInfo); err != nil {
+                       logrus.Errorf("failed to add cron task: %v", err)
+                       return false, err
+               }
+               logrus.Infof("added cron task for task '%d' with expression 
'%s'", taskInfo.ID, taskInfo.CronExpr)
+       }
 
        return ok, nil
 }
 
-func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
-       if taskInfo == nil {
-               return errors.New("the argument `taskInfo` is nil")
+/*
+* @Description: QueueTaskFromTemplate queues the task from the template.
+* @Note: This function will queue the task from the template. This function is 
used by cron tasks.
+* @Param template
+* @Return int32, error
+ */
+func (s *ScheduleBl) QueueTaskFromTemplate(template *structure.TaskInfo) 
(int32, error) {
+       if template == nil {
+               return -1, errors.New("the argument `template` is nil")
        }
 
-       s.Lock()
-       isHeadTask := s.spaceQueue.IsHeadTask(taskInfo.ID)
-       task := s.spaceQueue.RemoveTask(taskInfo.ID)
-       s.Unlock(nil)
+       bc := &baseCreator{}
+       taskInfo, err := bc.CopyTaskInfo(template)
+       if err != nil {
+               logrus.Errorf("failed to copy task info from template, template 
ID: %d, caused by: %v", template.ID, err)
+               return -1, err
+       }
+       bc.saveTaskInfo(taskInfo)
 
-       isInQueue := false
-       if task != nil {
-               logrus.Infof("removed task '%d' from space queue", task.ID)
-               isInQueue = true
+       ok, err := s.QueueTask(taskInfo)
+       if err != nil || !ok {
+               logrus.Errorf("failed to queue task from template, template ID: 
%d, caused by: %v", template.ID, err)
+               return -1, err
        }
 
-       if isInQueue && !isHeadTask {
-               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
-                       return err
-               }
+       logrus.Infof("queued task '%d' from template '%d'", taskInfo.ID, 
template.ID)
 
-               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
-       } else {
-               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
-               return s.handleCancelTask(taskInfo)
+       return taskInfo.ID, nil
+}
+
+/*
+* @Description: BatchQueueTask batches the task.
+* @Note: This function will batch the task.
+* @Param taskInfos
+* @Return []bool, []error
+ */
+func (s *ScheduleBl) BatchQueueTask(taskInfos []*structure.TaskInfo) ([]bool, 
[]error) {
+       if len(taskInfos) == 0 {
+               return []bool{}, []error{}
        }
 
-       return nil
-}
+       s.PauseDispatch()
 
-func (s *ScheduleBl) IsDispatchPaused() bool {
-       return s.isDispatchPaused
-}
-func (s *ScheduleBl) PauseDispatch() {
-       s.isDispatchPaused = true
-}
+       defer s.ResumeDispatch()
+       // defer s.Unlock(s.Lock())
 
-func (s *ScheduleBl) ResumeDispatch() {
-       s.isDispatchPaused = false
+       errors := make([]error, len(taskInfos))
+       oks := make([]bool, len(taskInfos))
+
+       for _, taskInfo := range taskInfos {
+               ok, err := s.QueueTask(taskInfo)
+               if err != nil {
+                       logrus.Errorf("failed to queue task '%d': %v", 
taskInfo.ID, err)
+               }
+               errors = append(errors, err)
+               oks = append(oks, ok)
+       }
+
+       return oks, errors
 }
 
-func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
-       return s.spaceQueue.AllTasks()
+// ******** CloseCurrent ********
+func (s *ScheduleBl) CloseCurrent(taskId int32, removeWorkerName ...string) 
error {
+       defer s.Unlock(s.Lock())
+
+       // trace tasks need these workers, check if these tasks are available
+       s.taskManager.RemoveTask(taskId)
+       // release the worker group
+       s.resourceManager.ReleaseByTaskID(taskId)
+
+       if len(removeWorkerName) > 0 {
+               // stop the cron job if exists when need remove worker, 
otherwise the task is just closed normally
+               s.cronManager.DeleteTask(taskId)
+               // remove the worker from resource manager
+               workerName := removeWorkerName[0]
+               if workerName == "" {
+                       return errors.New("the argument `removeWorkerName` is 
empty")
+               }
+               logrus.Infof("removing worker '%s' from resource manager", 
workerName)
+               s.ChangeWorkerStatus(workerName, 
schedules.WorkerOngoingStatusDeleted)
+       }
+
+       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
+       s.TryScheduleNextTasks(true)
+       return nil
 }
 
-func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
-       return s.spaceQueue.SpaceTasks(space)
+// This will be called when a worker is offline.
+// This will be called when a worker is online.
+func (s *ScheduleBl) ChangeWorkerStatus(workerName string, status 
schedules.WorkerOngoingStatus) (bool, error) {
+       defer s.Unlock(s.Lock())
+       s.resourceManager.ChangeWorkerStatus(workerName, status)
+
+       logrus.Infof("worker '%s' status changed to '%s'", workerName, status)
+       // After changing the worker status, we may need to reschedule tasks
+       s.TryScheduleNextTasks(true)
+
+       return true, nil
 }
 
-func (s *ScheduleBl) CloseCurrent(taskId int32) error {
-       logrus.Infof("invoke dispatch when task '%d' is closed", taskId)
-       s.dispatch()
+// ******** START TASK ********
+func (s *ScheduleBl) waitingStartedTask() {
+       for taskInfo := range s.startChan {
+               if taskInfo == nil {
+                       logrus.Warnf("received a nil task from startChan")
+                       continue
+               }
 
-       return nil
+               logrus.Infof("chan received task '%d' to start", taskInfo.ID)
+               s.handleStartTask(taskInfo)
+       }
 }
 
+// now, start task!
 func (s *ScheduleBl) handleStartTask(taskInfo *structure.TaskInfo) {
-       agent, status, err := s.broker.ApplyAgent(taskInfo)
+       agent, status, err := s.resourceManager.GetAgentAndAssignTask(taskInfo)
 
        if err != nil {
                logrus.Errorf("apply agent error: %v", err)
@@ -175,24 +411,6 @@ func (s *ScheduleBl) handleStartTask(taskInfo 
*structure.TaskInfo) {
        go s.startWaitingTask(agent, taskInfo)
 }
 
-func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
-       logrus.Infof("received task '%d' to cancel", taskInfo.ID)
-       canceler, err := NewTaskCanceler(taskInfo)
-       if err != nil {
-               logrus.Errorf("failed to create new TaskCanceler err: %v", err)
-               taskMgr.SetError(taskInfo, err.Error())
-               return err
-       }
-
-       if err := canceler.CancelTask(); err != nil {
-               logrus.Errorf("failed to cancel task '%d', caused by: %v", 
taskInfo.ID, err)
-               taskMgr.SetError(taskInfo, err.Error())
-               return err
-       }
-
-       return nil
-}
-
 func (s *ScheduleBl) startWaitingTask(agent *schedules.Agent, taskInfo 
*structure.TaskInfo) {
        logrus.Infof("starting a task, id: %v, type: %v, graph: %v", 
taskInfo.ID, taskInfo.Type, taskInfo.GraphName)
 
@@ -202,6 +420,7 @@ func (s *ScheduleBl) startWaitingTask(agent 
*schedules.Agent, taskInfo *structur
                }
        }()
 
+       // TODO: Is here need a lock? TOCTTOU
        if taskInfo.State != structure.TaskStateWaiting {
                logrus.Errorf("task state is not in 'Waiting' state, taskID: 
%v", taskInfo)
                return
@@ -222,68 +441,115 @@ func (s *ScheduleBl) startWaitingTask(agent 
*schedules.Agent, taskInfo *structur
 
        taskInfo.StartTime = time.Now()
        err = taskStarter.StartTask()
+
+       // only for test or debug, record the task start sequence
+       if err := s.taskManager.AddTaskStartSequence(taskInfo.ID); err != nil {
+               logrus.Errorf("failed to add task '%d' to start sequence: %v", 
taskInfo.ID, err)
+       }
+
        if err != nil {
                logrus.Errorf("failed to start a task, type: %s, taskID: %d, 
caused by: %v", taskInfo.Type, taskInfo.ID, err)
                taskMgr.SetError(taskInfo, err.Error())
        }
-
 }
 
-func (s *ScheduleBl) dispatch() {
-       defer func() {
-               if err := recover(); err != nil {
-                       logrus.Errorln("dispatch() has been recovered:", err)
+// ********* CANCEL TASK ********
+// handle cancel task
+// need to cancel cron task
+func (s *ScheduleBl) CancelTask(taskInfo *structure.TaskInfo) error {
+       if taskInfo == nil {
+               return errors.New("the argument `taskInfo` is nil")
+       }
+
+       defer s.Unlock(s.Lock())
+
+       isHeadTask := s.taskManager.IsTaskOngoing(taskInfo.ID)
+       task := s.taskManager.RemoveTask(taskInfo.ID)
+       s.cronManager.DeleteTask(taskInfo.ID)
+       // err := s.taskManager.CancelTask(taskInfo)
+       isInQueue := false
+       if task != nil {
+               logrus.Infof("removed task '%d' from space queue", taskInfo.ID)
+               isInQueue = true
+       }
+
+       if isInQueue && !isHeadTask {
+               if err := taskMgr.SetState(taskInfo, 
structure.TaskStateCanceled); err != nil {
+                       return err
                }
-       }()
 
-       if err := s.doDispatch(); err != nil {
-               logrus.Errorf("do dispatching error:%v", err)
+               logrus.Infof("set task '%d' to TaskStateCanceled", taskInfo.ID)
+       } else {
+               logrus.Infof("sending task '%d' to task canceler", taskInfo.ID)
+               return s.handleCancelTask(taskInfo)
        }
+
+       return nil
 }
 
-func (s *ScheduleBl) doDispatch() error {
-       if s.isDispatchPaused {
-               logrus.Warn("the dispatching was paused")
-               return nil
+func (s *ScheduleBl) handleCancelTask(taskInfo *structure.TaskInfo) error {
+       logrus.Infof("received task '%d' to cancel", taskInfo.ID)
+       canceler, err := NewTaskCanceler(taskInfo)
+       if err != nil {
+               logrus.Errorf("failed to create new TaskCanceler err: %v", err)
+               taskMgr.SetError(taskInfo, err.Error())
+               return err
        }
 
-       defer s.dispatchLocker.Unlock(s.dispatchLocker.Lock())
-
-       buffer := s.spaceQueue.HeadTasks()
-       if len(buffer) == 0 {
-               return nil
+       if err := canceler.CancelTask(); err != nil {
+               logrus.Errorf("failed to cancel task '%d', caused by: %v", 
taskInfo.ID, err)
+               taskMgr.SetError(taskInfo, err.Error())
+               return err
        }
 
-       for _, task := range buffer {
-               select {
-               case s.startChan <- task:
-               default:
-                       logrus.Warnf("the start channel is full, dropped task: 
%d", task.ID)
-               }
+       // set worker state to idle or concurrent running
+       s.resourceManager.ReleaseByTaskID(taskInfo.ID)
+
+       return nil
+}
 
+func (s *ScheduleBl) CancelCronTask(taskInfo *structure.TaskInfo) error {
+       if taskInfo == nil {
+               return errors.New("the argument `taskInfo` is nil")
        }
 
+       s.cronManager.DeleteTask(taskInfo.ID)
+
        return nil
 }
 
-func (s *ScheduleBl) waitingTask() {
-       for taskInfo := range s.startChan {
-               if taskInfo == nil {
-                       logrus.Warnf("recieved a nil task from startChan")
-                       return
-               }
+// ** Other Methods **
 
-               logrus.Infof("chan received task '%d' to start", taskInfo.ID)
-               s.handleStartTask(taskInfo)
-       }
+func (s *ScheduleBl) PeekSpaceTail(space string) *structure.TaskInfo {
+       return s.taskManager.GetLastTask(space)
 }
 
-func (s *ScheduleBl) startTicker() {
-       // Create a ticker that triggers every 3 seconds
-       ticker := time.Tick(3 * time.Second)
+func (s *ScheduleBl) IsDispatchPaused() bool {
+       // Implement logic to check if dispatching is paused
+       return s.algorithmManager.IsDispatchPaused()
+}
 
-       for range ticker {
-               //logrus.Debug("Ticker ticked")
-               s.dispatch()
-       }
+func (s *ScheduleBl) PauseDispatch() {
+       // Implement logic to pause dispatching
+       s.algorithmManager.PauseDispatch()
+}
+
+func (s *ScheduleBl) ResumeDispatch() {
+       // Implement logic to resume dispatching
+       s.algorithmManager.ResumeDispatch()
+}
+
+func (s *ScheduleBl) AllTasksInQueue() []*structure.TaskInfo {
+       // Implement logic to get all tasks in the queue
+       return s.taskManager.GetAllTasks()
+}
+
+func (s *ScheduleBl) TasksInQueue(space string) []*structure.TaskInfo {
+       // Implement logic to get tasks in the queue for a specific space
+       return s.taskManager.GetTasksInQueue(space)
+}
+
+func (s *ScheduleBl) TaskStartSequence(queryTasks []int32) 
[]*structure.TaskInfo {
+       // Only for debug or test, get task start sequence
+       return s.taskManager.GetTaskStartSequence(queryTasks)
 }
diff --git a/vermeer/apps/master/bl/task_bl.go 
b/vermeer/apps/master/bl/task_bl.go
index fa3d5b58..1c7e43c9 100644
--- a/vermeer/apps/master/bl/task_bl.go
+++ b/vermeer/apps/master/bl/task_bl.go
@@ -20,7 +20,10 @@ package bl
 import (
        "errors"
        "fmt"
+       "math"
        "sort"
+       "strconv"
+       "strings"
        "time"
        "vermeer/apps/compute"
 
@@ -62,6 +65,56 @@ func (tb *TaskBl) CreateTaskInfo(
                return nil, err
        }
 
+       // for scheduler
+       taskInfo.Priority = 0
+       taskInfo.Preorders = make([]int32, 0)
+       taskInfo.Exclusive = true // default to true for now, can be set false 
by params
+       if params != nil {
+               if priority, ok := params["priority"]; ok {
+                       if p, err := strconv.ParseInt(priority, 10, 32); err == 
nil {
+                               if p < 0 {
+                                       return nil, fmt.Errorf("priority should 
be non-negative")
+                               }
+                               if p > math.MaxInt32 {
+                                       return nil, fmt.Errorf("priority 
exceeds maximum value: %d", math.MaxInt32)
+                               }
+                               taskInfo.Priority = int32(p)
+                       } else {
+                               logrus.Warnf("priority convert to int32 
error:%v", err)
+                               return nil, err
+                       }
+               }
+               if preorders, ok := params["preorders"]; ok {
+                       preorderList := strings.Split(preorders, ",")
+                       for _, preorder := range preorderList {
+                               if pid, err := strconv.ParseInt(preorder, 10, 
32); err == nil {
+                                       if taskMgr.GetTaskByID(int32(pid)) == 
nil {
+                                               return nil, 
fmt.Errorf("preorder task with ID %d does not exist", pid)
+                                       }
+                                       taskInfo.Preorders = 
append(taskInfo.Preorders, int32(pid))
+                               } else {
+                                       logrus.Warnf("preorder convert to int32 
error:%v", err)
+                                       return nil, err
+                               }
+                       }
+               }
+               if exclusive, ok := params["exclusive"]; ok {
+                       if ex, err := strconv.ParseBool(exclusive); err == nil {
+                               taskInfo.Exclusive = ex
+                       } else {
+                               logrus.Warnf("exclusive convert to bool 
error:%v", err)
+                               return nil, err
+                       }
+               }
+               if cronExpr, ok := params["cron_expr"]; ok {
+                       if err := 
Scheduler.cronManager.CheckCronExpression(cronExpr); err != nil {
+                               logrus.Warnf("cron_expr parse error:%v", err)
+                               return nil, err
+                       }
+                       taskInfo.CronExpr = cronExpr
+               }
+       }
+
        return taskInfo, nil
 }
 
@@ -146,6 +199,9 @@ func (tb *TaskBl) CancelTask(taskID int32) error {
                return fmt.Errorf("cannot cancel the task with id '%v' as it 
was not created by you", taskID)
        }
 
+       // stop the cron job if exists
+       Scheduler.CancelCronTask(task)
+
        if task.State == structure.TaskStateCanceled {
                return fmt.Errorf("task had been in state canceled")
        }
@@ -206,13 +262,10 @@ func QueueExecuteTask(taskInfo *structure.TaskInfo) error 
{
 }
 
 func QueueExecuteTasks(tasksInfo []*structure.TaskInfo) []error {
-       defer Scheduler.Unlock(Scheduler.Lock())
-       errs := make([]error, 0, len(tasksInfo))
        for _, task := range tasksInfo {
                task.CreateType = structure.TaskCreateAsync
-               _, err := Scheduler.QueueTask(task)
-               errs = append(errs, err)
        }
+       _, errs := Scheduler.BatchQueueTask(tasksInfo)
        return errs
 }
 
diff --git a/vermeer/apps/master/bl/task_creator.go 
b/vermeer/apps/master/bl/task_creator.go
index ca3f946c..a7353c85 100644
--- a/vermeer/apps/master/bl/task_creator.go
+++ b/vermeer/apps/master/bl/task_creator.go
@@ -99,6 +99,28 @@ func (bc *baseCreator) NewTaskInfo(graphName string, params 
map[string]string, t
        return task, nil
 }
 
+func (bc *baseCreator) CopyTaskInfo(src *structure.TaskInfo) 
(*structure.TaskInfo, error) {
+       if src == nil {
+               return nil, fmt.Errorf("the argument `src` should not be nil")
+       }
+
+       task, err := taskMgr.CreateTask(src.SpaceName, src.Type, 0)
+       if err != nil {
+               return nil, err
+       }
+
+       task.CreateType = structure.TaskCreateAsync
+       task.GraphName = src.GraphName
+       task.CreateUser = src.CreateUser
+       task.Params = src.Params
+       task.CronExpr = "" // clear cron expression for the new task
+       task.Priority = src.Priority
+       task.Preorders = src.Preorders
+       task.Exclusive = src.Exclusive
+
+       return task, nil
+}
+
 func (bc *baseCreator) saveTaskInfo(task *structure.TaskInfo) 
(*structure.TaskInfo, error) {
        if _, err := taskMgr.AddTask(task); err != nil {
                logrus.Errorf("failed to add a task to `TaskManager`, task: %v, 
cased by: %v", task, err)
diff --git a/vermeer/apps/master/bl/worker_bl.go 
b/vermeer/apps/master/bl/worker_bl.go
index e14d20c4..651e1e04 100644
--- a/vermeer/apps/master/bl/worker_bl.go
+++ b/vermeer/apps/master/bl/worker_bl.go
@@ -70,7 +70,7 @@ func (wb *WorkerBl) ReleaseWorker(workerName string) error {
                                //taskInfo.SetErrMsg(fmt.Sprintf("worker %v is 
offline", workerName))
                                taskMgr.SetError(taskInfo, fmt.Sprintf("worker 
%v is offline", workerName))
                                logrus.Warnf("set task %v status:error", 
taskInfo.ID)
-                               if err := Scheduler.CloseCurrent(taskInfo.ID); 
err != nil {
+                               if err := Scheduler.CloseCurrent(taskInfo.ID, 
workerName); err != nil {
                                        logrus.Errorf("failed to close task 
with ID: %d,err:%v", taskInfo.ID, err)
                                }
                                break
diff --git a/vermeer/apps/master/master_main.go 
b/vermeer/apps/master/master_main.go
index 2da20a2f..91cd3318 100644
--- a/vermeer/apps/master/master_main.go
+++ b/vermeer/apps/master/master_main.go
@@ -56,6 +56,7 @@ func Main() {
                services.SetUI(sen)
                logrus.Info("token-auth was activated")
        default:
+               services.SetAdminRouters(sen, auth.NoneAuthFilter)
                services.SetRouters(sen, auth.NoneAuthFilter)
                logrus.Warn("No authentication was activated.")
        }
diff --git a/vermeer/apps/master/schedules/broker.go 
b/vermeer/apps/master/schedules/broker.go
index fabdf415..435219dc 100644
--- a/vermeer/apps/master/schedules/broker.go
+++ b/vermeer/apps/master/schedules/broker.go
@@ -23,7 +23,7 @@ import (
 
        "github.com/sirupsen/logrus"
 
-       . "vermeer/apps/master/workers"
+       "vermeer/apps/master/workers"
 )
 
 type AgentStatus string
@@ -72,42 +72,44 @@ func (b *Broker) AllAgents() []*Agent {
        return res
 }
 
-func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo) (*Agent, 
AgentStatus, error) {
+func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo, forceApply ...bool) 
(*Agent, AgentStatus, map[string]*workers.WorkerClient, error) {
        if taskInfo == nil {
-               return nil, AgentStatusError, fmt.Errorf("taskInfo is nil")
+               return nil, AgentStatusError, nil, fmt.Errorf("taskInfo is nil")
        }
 
        defer b.Unlock(b.Lock())
 
        agent, workers, err := b.getAgent(taskInfo)
        if err != nil {
-               return nil, AgentStatusError, err
+               return nil, AgentStatusError, nil, err
        }
 
        if agent == nil {
-               return nil, AgentStatusPending, nil
+               return nil, AgentStatusPending, nil, nil
        }
 
        if workers == nil || len(workers) == 0 {
-               return nil, AgentStatusNoWorker, nil
+               return nil, AgentStatusNoWorker, nil, nil
        }
 
        if !b.isWorkersReady(workers) {
                logrus.Warnf("the workers of agent '%s' are not ready", 
agent.GroupName())
-               return nil, AgentStatusWorkerNotReady, nil
+               return nil, AgentStatusWorkerNotReady, nil, nil
        }
 
-       if b.isAgentBusy(agent) {
-               return nil, AgentStatusAgentBusy, nil
-       }
+       if !(forceApply != nil && len(forceApply) > 0 && forceApply[0]) {
+               if b.isAgentBusy(agent) {
+                       return nil, AgentStatusAgentBusy, nil, nil
+               }
 
-       if b.isWorkerBusy(workers, agent) {
-               return nil, AgentStatusWorkerBusy, nil
+               if b.isWorkerBusy(workers, agent) {
+                       return nil, AgentStatusWorkerBusy, nil, nil
+               }
        }
 
        agent.AssignTask(taskInfo)
 
-       return agent, AgentStatusOk, nil
+       return agent, AgentStatusOk, workers, nil
 }
 
 // func (b *Broker) isAgentReady(taskInfo *structure.TaskInfo, agent *Agent) 
bool {
@@ -124,7 +126,7 @@ func (b *Broker) ApplyAgent(taskInfo *structure.TaskInfo) 
(*Agent, AgentStatus,
 //     }
 // }
 
-func (b *Broker) isWorkersReady(workers map[string]*WorkerClient) bool {
+func (b *Broker) isWorkersReady(workers map[string]*workers.WorkerClient) bool 
{
        ok := false
        for _, w := range workers {
                if w.Connection == nil {
@@ -166,7 +168,7 @@ func (b *Broker) isAgentBusy(agent *Agent) bool {
        return busy
 }
 
-func (b *Broker) isWorkerBusy(workers map[string]*WorkerClient, agent *Agent) 
bool {
+func (b *Broker) isWorkerBusy(workers map[string]*workers.WorkerClient, agent 
*Agent) bool {
        for _, a := range b.agents {
                if a == agent {
                        continue
@@ -188,7 +190,7 @@ func (b *Broker) isWorkerBusy(workers 
map[string]*WorkerClient, agent *Agent) bo
        return false
 }
 
-func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*WorkerClient, error) {
+func (b *Broker) getAgent(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*workers.WorkerClient, error) {
        switch taskInfo.Type {
        case structure.TaskTypeLoad:
                fallthrough
@@ -202,7 +204,7 @@ func (b *Broker) getAgent(taskInfo *structure.TaskInfo) 
(*Agent, map[string]*Wor
 
 }
 
-func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*WorkerClient, error) {
+func (b *Broker) getAgentFromGraph(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*workers.WorkerClient, error) {
        graph := graphMgr.GetGraphByName(taskInfo.SpaceName, taskInfo.GraphName)
        if graph == nil {
                return nil, nil, fmt.Errorf("failed to retrieve graph with 
name: %s/%s", taskInfo.SpaceName, taskInfo.GraphName)
@@ -223,7 +225,7 @@ func (b *Broker) getAgentFromGraph(taskInfo 
*structure.TaskInfo) (*Agent, map[st
                return nil, nil, nil // waiting for the next check
        }
 
-       workers := make(map[string]*WorkerClient)
+       workers := make(map[string]*workers.WorkerClient)
 
        for _, w := range graph.Workers {
                wc := workerMgr.GetWorker(w.Name)
@@ -238,7 +240,7 @@ func (b *Broker) getAgentFromGraph(taskInfo 
*structure.TaskInfo) (*Agent, map[st
 
 }
 
-func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*WorkerClient, error) {
+func (b *Broker) getAgentFromWorker(taskInfo *structure.TaskInfo) (*Agent, 
map[string]*workers.WorkerClient, error) {
        group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
        return b.retrieveAgent(group), workerMgr.GroupWorkerMap(group), nil
 }
diff --git a/vermeer/apps/master/schedules/scheduler_algorithm_manager.go 
b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go
new file mode 100644
index 00000000..c65e9f4d
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_algorithm_manager.go
@@ -0,0 +1,524 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+       "slices"
+       "sort"
+       "strconv"
+       "time"
+       "vermeer/apps/common"
+       "vermeer/apps/structure"
+
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerAlgorithm is the interface for the scheduler 
algorithm.
+* @Note: This is the interface for the scheduler algorithm.
+ */
+type SchedulerAlgorithm interface {
+       // Name returns the name of the SchedulerAlgorithm
+       Name() string
+       // Init initializes the SchedulerAlgorithm
+       Init()
+       // FilterNextTasks filters the next tasks to be scheduled based on the 
provided parameters
+       FilterNextTasks(allTasks []*structure.TaskInfo, taskToWorkerGroupMap 
map[int32]string, idleWorkerGroups []string, concurrentWorkerGroups []string, 
softSchedule bool) ([]*structure.TaskInfo, error)
+       // ScheduleNextTasks schedules the next tasks based on the filtered 
tasks
+       ScheduleNextTasks(filteredTasks []*structure.TaskInfo, 
taskToWorkerGroupMap map[int32]string, idleWorkerGroups []string, 
concurrentWorkerGroups []string, softSchedule bool) ([]*structure.TaskInfo, 
error)
+}
+
+/*
+* @Description: SchedulerAlgorithmManager is the manager for the scheduler 
algorithm.
+* @Note: This is the manager for the scheduler algorithm.
+ */
+type SchedulerAlgorithmManager struct {
+       filteredSchedulerAlgorithms  map[string]SchedulerAlgorithm
+       scheduledSchedulerAlgorithms map[string]SchedulerAlgorithm
+       dispatchPaused               bool
+}
+
+/*
+* @Description: Init initializes the SchedulerAlgorithmManager.
+* @Note: This function will initialize the SchedulerAlgorithmManager.
+ */
+// Need to put DependsSchedulerAlgorithm before WaitingSchedulerAlgorithm
+func (am *SchedulerAlgorithmManager) Init() {
+       am.filteredSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+       am.scheduledSchedulerAlgorithms = make(map[string]SchedulerAlgorithm)
+       am.dispatchPaused = false
+       // Register filter and schedule algorithms
+       am.RegisterFilterAlgorithm(&DependsSchedulerAlgorithm{})
+       am.RegisterFilterAlgorithm(&WaitingSchedulerAlgorithm{})
+       // Register default SchedulerAlgorithms
+       am.RegisterSchedulerAlgorithm(&PriorityElderSchedulerAlgorithm{})
+}
+
+/*
+* @Description: RegisterSchedulerAlgorithm registers the scheduler algorithm.
+* @Note: This function will register the scheduler algorithm.
+* @Param schedulerAlgorithm
+ */
+func (am *SchedulerAlgorithmManager) 
RegisterSchedulerAlgorithm(schedulerAlgorithm SchedulerAlgorithm) {
+       if schedulerAlgorithm == nil {
+               return
+       }
+       name := schedulerAlgorithm.Name()
+       if _, exists := am.scheduledSchedulerAlgorithms[name]; exists {
+               return // SchedulerAlgorithm already registered
+       }
+
+       // only support one scheduling algorithm for now
+       if len(am.scheduledSchedulerAlgorithms) > 0 {
+               return // Only one scheduling algorithm can be registered
+       }
+       schedulerAlgorithm.Init()
+       am.scheduledSchedulerAlgorithms[name] = schedulerAlgorithm
+}
+
+/*
+* @Description: RegisterFilterAlgorithm registers the filter algorithm.
+* @Note: This function will register the filter algorithm.
+* @Param filterAlgorithm
+ */
+func (am *SchedulerAlgorithmManager) RegisterFilterAlgorithm(filterAlgorithm 
SchedulerAlgorithm) {
+       if filterAlgorithm == nil {
+               return
+       }
+       name := filterAlgorithm.Name()
+       if _, exists := am.filteredSchedulerAlgorithms[name]; exists {
+               return // SchedulerAlgorithm already registered
+       }
+       filterAlgorithm.Init()
+       am.filteredSchedulerAlgorithms[name] = filterAlgorithm
+}
+
+/*
+* @Description: IsDispatchPaused checks if the dispatch is paused.
+* @Note: This function will check if the dispatch is paused.
+* @Return bool
+ */
+func (am *SchedulerAlgorithmManager) IsDispatchPaused() bool {
+       return am.dispatchPaused
+}
+
+/*
+* @Description: PauseDispatch pauses the dispatch.
+* @Note: This function will pause the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) PauseDispatch() {
+       am.dispatchPaused = true
+}
+
+/*
+* @Description: ResumeDispatch resumes the dispatch.
+* @Note: This function will resume the dispatch.
+ */
+func (am *SchedulerAlgorithmManager) ResumeDispatch() {
+       am.dispatchPaused = false
+}
+
+/*
+* @Description: ScheduleNextTasks schedules the next tasks.
+* @Note: This function will schedule the next tasks.
+* @Param allTasks
+* @Param taskToWorkerGroupMap
+* @Param idleWorkerGroups
+* @Param concurrentWorkerGroups
+* @Param softSchedule
+* @Return []*structure.TaskInfo, error
+ */
+// For all tasks, filter and schedule them
+// Only one scheduling algorithm is supported for now
+func (am *SchedulerAlgorithmManager) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if am.dispatchPaused {
+               return nil, nil // No tasks to schedule if dispatch is paused
+       }
+
+       filteredTasks := allTasks
+       for _, algorithm := range am.filteredSchedulerAlgorithms {
+               var err error
+               filteredTasks, err = algorithm.FilterNextTasks(filteredTasks, 
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       if len(filteredTasks) == 0 {
+               return nil, nil // No tasks to schedule after filtering
+       }
+
+       // only support one scheduling algorithm for now
+       // get first algorithm
+       for _, algorithm := range am.scheduledSchedulerAlgorithms {
+               tasks, err := algorithm.ScheduleNextTasks(filteredTasks, 
taskToWorkerGroupMap, idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+               if err != nil {
+                       return nil, err
+               }
+               return tasks, nil // Return the scheduled tasks
+       }
+
+       return nil, nil // No tasks scheduled
+}
+
+type FIFOSchedulerAlgorithm struct{}
+
+func (f *FIFOSchedulerAlgorithm) Name() string {
+       return "FIFO"
+}
+
+func (f *FIFOSchedulerAlgorithm) Init() {
+       // No specific initialization needed for FIFO
+       logrus.Info("Initializing FIFOSchedulerAlgorithm")
+}
+
+func (f *FIFOSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for FIFO
+       return allTasks, nil
+}
+
+func (f *FIFOSchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       // For FIFO, we simply return the available tasks in the order they are 
provided
+       for _, task := range allTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       // only support idle worker groups for now
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
+
+type PrioritySchedulerAlgorithm struct{}
+
+func (p *PrioritySchedulerAlgorithm) Name() string {
+       return "Priority"
+}
+
+func (p *PrioritySchedulerAlgorithm) Init() {
+       // No specific initialization needed for Priority
+       logrus.Info("Initializing PrioritySchedulerAlgorithm")
+}
+
+func (p *PrioritySchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for Priority
+       return allTasks, nil
+}
+
+func (p *PrioritySchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       // Sort tasks by priority (higher priority first)
+       sort.Slice(allTasks, func(i, j int) bool {
+               return allTasks[i].Priority > allTasks[j].Priority
+       })
+
+       for _, task := range allTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       // only support idle worker groups for now
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
+
+type PriorityElderSchedulerAlgorithm struct {
+       ageParam         int64
+       priorityParam    int64
+       resourceParam    int64
+       randomValueParam int64
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Name() string {
+       return "PriorityElder"
+}
+
+func (p *PriorityElderSchedulerAlgorithm) Init() {
+       logrus.Info("Initializing PriorityElderSchedulerAlgorithm")
+
+       // Initialize parameters with default values
+       defaultAgeParam := "1"
+       defaultPriorityParam := "1"
+       defaultResourceParam := "10000000000"
+       defaultRandomValueParam := "1" // Placeholder for any random value logic
+
+       // Load parameters from configuration
+       ageParam := common.GetConfigDefault("priority_elder_age_param", 
defaultAgeParam).(string)
+       priorityParam := 
common.GetConfigDefault("priority_elder_priority_param", 
defaultPriorityParam).(string)
+       resourceParam := 
common.GetConfigDefault("priority_elder_resource_param", 
defaultResourceParam).(string)
+       randomValueParam := 
common.GetConfigDefault("priority_elder_random_value_param", 
defaultRandomValueParam).(string)
+
+       ageParamInt, err := strconv.Atoi(ageParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_age_param to 
int: %v", err)
+               logrus.Infof("using default priority_elder_age_param: %s", 
defaultAgeParam)
+               ageParamInt, _ = strconv.Atoi(defaultAgeParam)
+       }
+       p.ageParam = int64(ageParamInt)
+       priorityParamInt, err := strconv.Atoi(priorityParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_priority_param 
to int: %v", err)
+               logrus.Infof("using default priority_elder_priority_param: %s", 
defaultPriorityParam)
+               priorityParamInt, _ = strconv.Atoi(defaultPriorityParam)
+       }
+       p.priorityParam = int64(priorityParamInt)
+       resourceParamInt, err := strconv.Atoi(resourceParam)
+       if err != nil {
+               logrus.Errorf("failed to convert priority_elder_resource_param 
to int: %v", err)
+               logrus.Infof("using default priority_elder_resource_param: %s", 
defaultResourceParam)
+               resourceParamInt, _ = strconv.Atoi(defaultResourceParam)
+       }
+       p.resourceParam = int64(resourceParamInt)
+       randomValueParamInt, err := strconv.Atoi(randomValueParam)
+       if err != nil {
+               logrus.Errorf("failed to convert 
priority_elder_random_value_param to int: %v", err)
+               logrus.Infof("using default priority_elder_random_value_param: 
%s", defaultRandomValueParam)
+               randomValueParamInt, _ = strconv.Atoi(defaultRandomValueParam)
+       }
+       p.randomValueParam = int64(randomValueParamInt)
+
+       logrus.Infof("PriorityElderSchedulerAlgorithm initialized with 
parameters: ageParam=%d, priorityParam=%d, resourceParam=%d, 
randomValueParam=%d", p.ageParam, p.priorityParam, p.resourceParam, 
p.randomValueParam)
+}
+
+func (p *PriorityElderSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       // just return the waiting tasks as is for PriorityElder
+       return allTasks, nil
+}
+
+func (p *PriorityElderSchedulerAlgorithm) CalculateTaskEmergency(task 
*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, printValue bool) 
int64 {
+       // step 0: get params
+       ageParam := p.ageParam
+       priorityParam := p.priorityParam
+       resourceParam := p.resourceParam
+       randomValueParam := p.randomValueParam
+       // step 1: age
+       ageCost := ageParam * time.Since(task.CreateTime).Milliseconds() / 1000 
// in seconds
+       // step 2: priority
+       priorityCost := priorityParam * int64(task.Priority)
+       // step 3: resource cost
+       graph := structure.GraphManager.GetGraphByName(task.SpaceName, 
task.GraphName)
+       resourceCost := int64(0)
+       if graph == nil {
+               resourceCost = resourceParam // if graph not found, use max 
resource cost
+       } else {
+               resourceCost = resourceParam / max(1, 
graph.VertexCount+graph.EdgeCount) // Avoid division by zero, ensure at least 1
+       }
+       // step 4: some random value
+       randomValue := int64(randomValueParam) // Placeholder for any random 
value logic
+       if printValue {
+               logrus.Debugf("Task %d: Age Cost: %d, Priority Cost: %d, 
Resource Cost: %d, Random Value: %d", task.ID, ageCost, priorityCost, 
resourceCost, randomValue)
+       }
+       return ageCost + priorityCost + resourceCost + randomValue
+}
+
+func (p *PriorityElderSchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       // calculate emergency value for each task
+       taskEmergencies := make(map[int32]int64)
+       for _, task := range allTasks {
+               taskEmergencies[task.ID] = p.CalculateTaskEmergency(task, 
taskToWorkerGroupMap, false)
+       }
+
+       // Sort tasks by priority (higher priority first)
+       sort.Slice(allTasks, func(i, j int) bool {
+               return taskEmergencies[allTasks[i].ID] > 
taskEmergencies[allTasks[j].ID]
+       })
+
+       for _, task := range allTasks {
+               logrus.Debugf("Task %d: Emergency Value: %d", task.ID, 
taskEmergencies[task.ID])
+       }
+
+       for _, task := range allTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+                       // if allow concurrent running, check if the group is 
in concurrent worker groups
+                       if !task.Exclusive {
+                               for _, concurrentGroup := range 
concurrentWorkerGroups {
+                                       if group == concurrentGroup {
+                                               logrus.Debugf("Task %d is 
assigned to concurrent worker group %s", task.ID, group)
+                                               return 
[]*structure.TaskInfo{task}, nil // Return the first task that can be scheduled
+                                       }
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
+
+type WaitingSchedulerAlgorithm struct{}
+
+func (w *WaitingSchedulerAlgorithm) Name() string {
+       return "Waiting"
+}
+
+func (w *WaitingSchedulerAlgorithm) Init() {
+       // No specific initialization needed for Waiting
+       logrus.Info("Initializing WaitingSchedulerAlgorithm")
+}
+
+func (w *WaitingSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       waitingTasks := make([]*structure.TaskInfo, 0)
+       for _, task := range allTasks {
+               if task.State == structure.TaskStateWaiting {
+                       waitingTasks = append(waitingTasks, task)
+               }
+       }
+       return waitingTasks, nil
+}
+
+func (w *WaitingSchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       waitingTasks, err := w.FilterNextTasks(allTasks, taskToWorkerGroupMap, 
idleWorkerGroups, concurrentWorkerGroups, softSchedule)
+       if err != nil {
+               return nil, err
+       }
+       if len(waitingTasks) == 0 {
+               return nil, nil
+       }
+       for _, task := range waitingTasks {
+               if task.State != structure.TaskStateWaiting {
+                       continue // Only consider tasks that are in the waiting 
state
+               }
+               if group, exists := taskToWorkerGroupMap[task.ID]; exists && 
group != "" {
+                       // only support idle worker groups for now
+                       for _, idleGroup := range idleWorkerGroups {
+                               if group == idleGroup {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+       return nil, nil // No tasks scheduled
+}
+
+type DependsSchedulerAlgorithm struct{}
+
+func (d *DependsSchedulerAlgorithm) Name() string {
+       return "Depends"
+}
+
+func (d *DependsSchedulerAlgorithm) Init() {
+       // No specific initialization needed for Depends
+       logrus.Info("Initializing DependsSchedulerAlgorithm")
+}
+
+func (d *DependsSchedulerAlgorithm) FilterNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       sort.Slice(allTasks, func(i, j int) bool {
+               return allTasks[i].ID < allTasks[j].ID
+       })
+
+       taskIDs := make(map[int32]*structure.TaskInfo)
+       for _, task := range allTasks {
+               taskIDs[task.ID] = task
+       }
+
+       filteredTasks := make([]*structure.TaskInfo, 0)
+       for _, task := range allTasks {
+               depends := task.Preorders
+               // Check if all dependencies are satisfied
+               allDepsSatisfied := true
+               for _, dep := range depends {
+                       if depTask, exists := taskIDs[dep]; exists && 
depTask.State != structure.TaskStateComplete {
+                               allDepsSatisfied = false
+                               break
+                       }
+               }
+               if allDepsSatisfied {
+                       if group, exists := taskToWorkerGroupMap[task.ID]; 
exists && group != "" {
+                               filteredTasks = append(filteredTasks, task) // 
Add to filtered tasks if dependencies are satisfied
+                       }
+               }
+       }
+       return filteredTasks, nil
+}
+
+func (d *DependsSchedulerAlgorithm) ScheduleNextTasks(allTasks 
[]*structure.TaskInfo, taskToWorkerGroupMap map[int32]string, idleWorkerGroups 
[]string, concurrentWorkerGroups []string, softSchedule bool) 
([]*structure.TaskInfo, error) {
+       if len(allTasks) == 0 {
+               return nil, nil // No tasks to schedule
+       }
+
+       sort.Slice(allTasks, func(i, j int) bool {
+               return allTasks[i].ID < allTasks[j].ID
+       })
+
+       allTaskIDs := make(map[int32]*structure.TaskInfo)
+       for _, task := range allTasks {
+               allTaskIDs[task.ID] = task
+       }
+
+       for _, task := range allTasks {
+               depends := task.Preorders
+               // Check if all dependencies are satisfied
+               allDepsSatisfied := true
+               for _, dep := range depends {
+                       if depTask, exists := allTaskIDs[dep]; exists && 
depTask.State != structure.TaskStateComplete {
+                               allDepsSatisfied = false
+                               break
+                       }
+               }
+               if allDepsSatisfied {
+                       if group, exists := taskToWorkerGroupMap[task.ID]; 
exists && group != "" {
+                               // only support idle worker groups for now
+                               if slices.Contains(idleWorkerGroups, group) {
+                                       logrus.Debugf("Task %d is assigned to 
worker group %s", task.ID, group)
+                                       return []*structure.TaskInfo{task}, nil 
// Return the first task that can be scheduled
+                               }
+                       }
+               }
+       }
+
+       return nil, nil
+}
diff --git a/vermeer/apps/master/schedules/scheduler_cron_manager.go 
b/vermeer/apps/master/schedules/scheduler_cron_manager.go
new file mode 100644
index 00000000..651e1b9e
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_cron_manager.go
@@ -0,0 +1,161 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+       "errors"
+       "vermeer/apps/structure"
+
+       "github.com/robfig/cron/v3"
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerCronManager is the manager for the scheduler cron.
+* @Note: This is the manager for the scheduler cron.
+ */
+type SchedulerCronManager struct {
+       cronTasks map[int32][]*structure.TaskInfo // cron expression to 
TaskInfo. Origin task ID to copied tasks
+       crons     map[int32][]*cron.Cron          // cron expression to cron 
jobs
+       // queueTemplateHandler is a function that handles the task queue
+       queueTemplateHandler func(*structure.TaskInfo) (int32, error)
+}
+
+/*
+* @Description: Init initializes the SchedulerCronManager.
+* @Note: This function will initialize the SchedulerCronManager.
+* @Param queueTemplateHandler
+* @Return *SchedulerCronManager
+ */
+func (t *SchedulerCronManager) Init(queueTemplateHandler 
func(*structure.TaskInfo) (int32, error)) *SchedulerCronManager {
+       t.cronTasks = make(map[int32][]*structure.TaskInfo)
+       t.crons = make(map[int32][]*cron.Cron)
+       t.queueTemplateHandler = queueTemplateHandler
+       return t
+}
+
+/*
+* @Description: CheckCronExpression checks the cron expression.
+* @Note: This function will check the cron expression.
+* @Param cronExpr
+* @Return error
+ */
+func (t *SchedulerCronManager) CheckCronExpression(cronExpr string) error {
+       if cronExpr == "" {
+               return errors.New("cron expression is empty")
+       }
+       if _, err := cron.ParseStandard(cronExpr); err != nil {
+               logrus.Errorf("Failed to parse cron expression: %v", err)
+               return errors.New("invalid cron expression: " + err.Error())
+       }
+       return nil
+}
+
+/*
+* @Description: AddCronTask adds the cron task.
+* @Note: This function will add the cron task.
+* @Param taskInfo
+* @Return error
+ */
+func (t *SchedulerCronManager) AddCronTask(taskInfo *structure.TaskInfo) error 
{
+       if taskInfo == nil {
+               return errors.New("the argument `taskInfo` is nil")
+       }
+
+       if taskInfo.CronExpr == "" {
+               return errors.New("the property `CronExpr` of taskInfo is 
empty")
+       }
+
+       // add to cron tasks
+       cronJob := cron.New()
+       _, err := cronJob.AddFunc(taskInfo.CronExpr, func() {
+               if taskInfo == nil {
+                       return
+               }
+
+               // CREATE a new task from the original task, using taskbl, it 
is handled in queueTemplateHandler
+               // copy a new taskInfo
+               newID, err := t.queueTemplateHandler(taskInfo)
+               if err != nil {
+                       logrus.Errorf("Failed to queue task %d in cron job: 
%v", taskInfo.ID, err)
+                       return
+               }
+               logrus.Infof("Successfully queued task %d from cron job", newID)
+       })
+       if err != nil {
+               logrus.Errorf("Failed to add cron job for task %d: %v", 
taskInfo.ID, err)
+               return err
+       }
+       t.cronTasks[taskInfo.ID] = append(t.cronTasks[taskInfo.ID], taskInfo)
+       t.crons[taskInfo.ID] = append(t.crons[taskInfo.ID], cronJob)
+       cronJob.Start()
+       logrus.Infof("Added cron task for task ID %d with expression %s", 
taskInfo.ID, taskInfo.CronExpr)
+       return nil
+}
+
+/*
+* @Description: DeleteTask deletes the cron task.
+* @Note: This function will delete the cron task.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerCronManager) DeleteTask(taskID int32) error {
+       if _, exists := t.cronTasks[taskID]; !exists {
+               return errors.New("task not found in cron tasks")
+       }
+
+       for _, cronJob := range t.crons[taskID] {
+               cronJob.Stop()
+       }
+       delete(t.cronTasks, taskID)
+       delete(t.crons, taskID)
+       logrus.Infof("Deleted cron task for task ID %d", taskID)
+       return nil
+}
+
+/*
+* @Description: DeleteTaskByGraph deletes the cron task by graph.
+* @Note: This function will delete the cron task by graph.
+* @Param spaceName
+* @Param graphName
+* @Return error
+ */
+func (t *SchedulerCronManager) DeleteTaskByGraph(spaceName, graphName string) 
error {
+       if spaceName == "" || graphName == "" {
+               return errors.New("the argument `spaceName` or `graphName` is 
empty")
+       }
+
+       var toDelete []int32
+       for taskID, tasks := range t.cronTasks {
+               for _, task := range tasks {
+                       if task.SpaceName == spaceName && task.GraphName == 
graphName {
+                               toDelete = append(toDelete, taskID)
+                               break
+                       }
+               }
+       }
+
+       for _, taskID := range toDelete {
+               if err := t.DeleteTask(taskID); err != nil {
+                       logrus.Errorf("Failed to delete cron task for task ID 
%d: %v", taskID, err)
+                       return err
+               }
+       }
+       logrus.Infof("Deleted cron tasks for space %s and graph %s", spaceName, 
graphName)
+       return nil
+}
diff --git a/vermeer/apps/master/schedules/scheduler_resource_manager.go 
b/vermeer/apps/master/schedules/scheduler_resource_manager.go
new file mode 100644
index 00000000..8dabd3d0
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_resource_manager.go
@@ -0,0 +1,258 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+       "errors"
+       "vermeer/apps/structure"
+
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: WorkerOngoingStatus is the status of the worker ongoing.
+* @Note: This is the status of the worker ongoing.
+ */
+type WorkerOngoingStatus string
+
+const (
+       WorkerOngoingStatusIdle              WorkerOngoingStatus = "idle"
+       WorkerOngoingStatusRunning           WorkerOngoingStatus = "running"
+       WorkerOngoingStatusConcurrentRunning WorkerOngoingStatus = 
"concurrent_running"
+       WorkerOngoingStatusPaused            WorkerOngoingStatus = "paused"
+       WorkerOngoingStatusDeleted           WorkerOngoingStatus = "deleted"
+)
+
+/*
+* @Description: SchedulerResourceManager is the manager for the scheduler 
resource.
+* @Note: This is the manager for the scheduler resource.
+ */
+type SchedulerResourceManager struct {
+       structure.MutexLocker
+       workerStatus            map[string]WorkerOngoingStatus
+       workerGroupStatus       map[string]WorkerOngoingStatus
+       runningWorkerGroupTasks map[string][]int32 // worker group name to list 
of running task IDs
+       // broker just responsible for communication with workers
+       // it can not apply tasks to workers directly
+       broker *Broker
+}
+
+/*
+* @Description: Init initializes the SchedulerResourceManager.
+* @Note: This function will initialize the SchedulerResourceManager.
+ */
+func (rm *SchedulerResourceManager) Init() {
+       rm.workerStatus = make(map[string]WorkerOngoingStatus)
+       rm.workerGroupStatus = make(map[string]WorkerOngoingStatus)
+       rm.runningWorkerGroupTasks = make(map[string][]int32)
+       rm.broker = new(Broker).Init()
+}
+
+/*
+* @Description: ReleaseByTaskID releases the resource by task ID.
+* @Note: This function will release the resource by task ID.
+* @Param taskID
+ */
+func (rm *SchedulerResourceManager) ReleaseByTaskID(taskID int32) {
+       defer rm.Unlock(rm.Lock())
+
+       for workerGroup, status := range rm.workerGroupStatus {
+               if (status == WorkerOngoingStatusRunning || status == 
WorkerOngoingStatusConcurrentRunning) && 
rm.isTaskRunningOnWorkerGroup(workerGroup, taskID) {
+                       delete(rm.workerGroupStatus, workerGroup)
+                       if tasks, exists := 
rm.runningWorkerGroupTasks[workerGroup]; exists {
+                               for i, id := range tasks {
+                                       if id == taskID {
+                                               
rm.runningWorkerGroupTasks[workerGroup] = append(tasks[:i], tasks[i+1:]...)
+                                               if 
len(rm.runningWorkerGroupTasks[workerGroup]) == 0 {
+                                                       
delete(rm.runningWorkerGroupTasks, workerGroup)
+                                               }
+                                               break
+                                       }
+                               }
+                       }
+                       if tasks, exists := 
rm.runningWorkerGroupTasks[workerGroup]; !exists || len(tasks) == 0 {
+                               for _, worker := range 
workerMgr.GetGroupWorkers(workerGroup) {
+                                       rm.changeWorkerStatus(worker.Name, 
WorkerOngoingStatusIdle)
+                               }
+                       } else {
+                               for _, worker := range 
workerMgr.GetGroupWorkers(workerGroup) {
+                                       rm.changeWorkerStatus(worker.Name, 
WorkerOngoingStatusConcurrentRunning)
+                               }
+                       }
+               }
+       }
+}
+
+/*
+* @Description: isTaskRunningOnWorkerGroup checks if the task is running on 
the worker group.
+* @Note: This function will check if the task is running on the worker group.
+* @Param workerGroup
+* @Param taskID
+* @Return bool
+ */
+func (rm *SchedulerResourceManager) isTaskRunningOnWorkerGroup(workerGroup 
string, taskID int32) bool {
+       if tasks, exists := rm.runningWorkerGroupTasks[workerGroup]; exists {
+               for _, id := range tasks {
+                       if id == taskID {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+/*
+* @Description: GetAgentAndAssignTask gets the agent and assigns the task.
+* @Note: This function will get the agent and assigns the task.
+* @Param taskInfo
+* @Return *Agent, AgentStatus, error
+ */
+func (rm *SchedulerResourceManager) GetAgentAndAssignTask(taskInfo 
*structure.TaskInfo) (*Agent, AgentStatus, error) {
+       if taskInfo == nil {
+               return nil, AgentStatusError, errors.New("taskInfo is nil")
+       }
+
+       defer rm.Unlock(rm.Lock())
+
+       agent, status, workers, err := rm.broker.ApplyAgent(taskInfo, 
!taskInfo.Exclusive)
+       if err != nil {
+               return nil, AgentStatusError, err
+       }
+       if agent == nil {
+               return nil, status, nil
+       }
+
+       // Assign the task to the agent
+       agent.AssignTask(taskInfo)
+
+       exclusive := taskInfo.Exclusive
+       runningStatus := WorkerOngoingStatusRunning
+       if _, exists := rm.runningWorkerGroupTasks[agent.GroupName()]; !exists 
&& exclusive {
+               rm.runningWorkerGroupTasks[agent.GroupName()] = []int32{}
+               runningStatus = WorkerOngoingStatusRunning
+               rm.workerGroupStatus[agent.GroupName()] = runningStatus
+       } else {
+               runningStatus = WorkerOngoingStatusConcurrentRunning
+               rm.workerGroupStatus[agent.GroupName()] = runningStatus
+       }
+       rm.runningWorkerGroupTasks[agent.GroupName()] = 
append(rm.runningWorkerGroupTasks[agent.GroupName()], taskInfo.ID)
+
+       for _, worker := range workers {
+               if worker == nil {
+                       continue
+               }
+               rm.workerStatus[worker.Name] = runningStatus
+       }
+
+       return agent, status, nil
+}
+
+/*
+* @Description: GetIdleWorkerGroups gets the idle worker groups.
+* @Note: This function will get the idle worker groups.
+* @Return []string
+ */
+func (rm *SchedulerResourceManager) GetIdleWorkerGroups() []string {
+       defer rm.Unlock(rm.Lock())
+
+       idleWorkerGroups := make([]string, 0)
+       for workerGroup, status := range rm.workerGroupStatus {
+               if status == WorkerOngoingStatusIdle {
+                       idleWorkerGroups = append(idleWorkerGroups, workerGroup)
+               }
+       }
+       return idleWorkerGroups
+}
+
+/*
+* @Description: GetConcurrentWorkerGroups gets the concurrent worker groups.
+* @Note: This function will get the concurrent worker groups.
+* @Return []string
+ */
+func (rm *SchedulerResourceManager) GetConcurrentWorkerGroups() []string {
+       defer rm.Unlock(rm.Lock())
+
+       concurrentWorkerGroups := make([]string, 0)
+       for workerGroup, status := range rm.workerGroupStatus {
+               if status == WorkerOngoingStatusConcurrentRunning {
+                       concurrentWorkerGroups = append(concurrentWorkerGroups, 
workerGroup)
+               }
+       }
+       return concurrentWorkerGroups
+}
+
+/*
+* @Description: changeWorkerStatus changes the worker status.
+* @Note: This function will change the worker status.
+* @Param workerName
+* @Param status
+ */
+func (rm *SchedulerResourceManager) changeWorkerStatus(workerName string, 
status WorkerOngoingStatus) {
+       rm.workerStatus[workerName] = status
+
+       if status == WorkerOngoingStatusIdle || status == 
WorkerOngoingStatusConcurrentRunning {
+               workerInfo := workerMgr.GetWorkerInfo(workerName)
+
+               if workerInfo == nil {
+                       logrus.Warnf("worker '%s' not found", workerName)
+                       return
+               }
+
+               // get worker group name
+               groupName := workerInfo.Group
+               if groupName != "" {
+                       gws := workerMgr.GetGroupWorkers(groupName)
+                       allIdle := true
+                       allConcurrent := true
+                       for _, w := range gws {
+                               st := rm.workerStatus[w.Name]
+                               if st != WorkerOngoingStatusIdle {
+                                       allIdle = false
+                               }
+                               if st != WorkerOngoingStatusConcurrentRunning {
+                                       allConcurrent = false
+                               }
+                       }
+                       if allConcurrent || allIdle {
+                               newStatus := WorkerOngoingStatusIdle
+                               if allConcurrent {
+                                       newStatus = 
WorkerOngoingStatusConcurrentRunning
+                               }
+                               logrus.Debugf("Change worker group '%s' status 
to '%s' (derived from %d workers)", groupName, newStatus, len(gws))
+                               rm.changeWorkerGroupStatus(groupName, newStatus)
+                       }
+               }
+
+       } else if status == WorkerOngoingStatusDeleted {
+               delete(rm.workerStatus, workerName)
+       }
+
+       // TODO: Other status changes can be handled here if needed
+}
+
+func (rm *SchedulerResourceManager) changeWorkerGroupStatus(workerGroup 
string, status WorkerOngoingStatus) {
+       logrus.Infof("Change worker group '%s' status to '%s'", workerGroup, 
status)
+       rm.workerGroupStatus[workerGroup] = status
+}
+
+// TODO: when sync task created, need to alloc worker?
+func (rm *SchedulerResourceManager) ChangeWorkerStatus(workerName string, 
status WorkerOngoingStatus) {
+       defer rm.Unlock(rm.Lock())
+
+       rm.changeWorkerStatus(workerName, status)
+}
diff --git a/vermeer/apps/master/schedules/scheduler_task_manager.go 
b/vermeer/apps/master/schedules/scheduler_task_manager.go
new file mode 100644
index 00000000..21767a39
--- /dev/null
+++ b/vermeer/apps/master/schedules/scheduler_task_manager.go
@@ -0,0 +1,291 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package schedules
+
+import (
+       "errors"
+       "vermeer/apps/common"
+       "vermeer/apps/structure"
+
+       "github.com/sirupsen/logrus"
+)
+
+/*
+* @Description: SchedulerTaskManager is the manager for the scheduler task.
+* @Note: This is the manager for the scheduler task.
+ */
+type SchedulerTaskManager struct {
+       structure.MutexLocker
+       // This struct is responsible for managing tasks in the scheduling 
system.
+       // A map from task ID to TaskInfo can be used to track tasks.
+       allTaskMap   map[int32]*structure.TaskInfo
+       allTaskQueue []*structure.TaskInfo
+       // For debug or test, get task start sequence
+       startTaskQueue []*structure.TaskInfo
+       // onGoingTasks
+       notCompleteTasks map[int32]*structure.TaskInfo
+       // A map from task ID to worker group can be used to track which worker 
group is handling which task.
+       taskToworkerGroupMap map[int32]string
+}
+
+/*
+* @Description: Init initializes the SchedulerTaskManager.
+* @Note: This function will initialize the SchedulerTaskManager.
+* @Return *SchedulerTaskManager
+ */
+func (t *SchedulerTaskManager) Init() *SchedulerTaskManager {
+       t.allTaskMap = make(map[int32]*structure.TaskInfo)
+       t.notCompleteTasks = make(map[int32]*structure.TaskInfo)
+       t.taskToworkerGroupMap = make(map[int32]string)
+       return t
+}
+
+/*
+* @Description: QueueTask queues the task.
+* @Note: This function will queue the task.
+* @Param taskInfo
+* @Return bool, error
+ */
+func (t *SchedulerTaskManager) QueueTask(taskInfo *structure.TaskInfo) (bool, 
error) {
+       if taskInfo == nil {
+               return false, errors.New("the argument `taskInfo` is nil")
+       }
+
+       if taskInfo.SpaceName == "" {
+               return false, errors.New("the property `SpaceName` of taskInfo 
is empty")
+       }
+
+       defer t.Unlock(t.Lock())
+
+       // Add the task to the task map
+       t.allTaskMap[taskInfo.ID] = taskInfo
+       t.allTaskQueue = append(t.allTaskQueue, taskInfo)
+       t.notCompleteTasks[taskInfo.ID] = taskInfo
+       t.AssignGroup(taskInfo)
+       return true, nil
+}
+
+/*
+* @Description: RefreshTaskToWorkerGroupMap refreshes the task to worker group 
map.
+* @Note: This function will refresh the task to worker group map.
+ */
+func (t *SchedulerTaskManager) RefreshTaskToWorkerGroupMap() {
+       defer t.Unlock(t.Lock())
+
+       for _, taskInfo := range t.GetAllTasksNotComplete() {
+               if taskInfo == nil {
+                       continue
+               }
+               t.AssignGroup(taskInfo)
+               t.taskToworkerGroupMap[taskInfo.ID] = 
workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+       }
+}
+
+// Only for debug or test, get task start sequence
+/*
+* @Description: AddTaskStartSequence adds the task start sequence.
+* @Note: This function will add the task start sequence.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) AddTaskStartSequence(taskID int32) error {
+       if common.GetConfig("debug_mode").(string) != "debug" {
+               logrus.Warn("TaskStartSequence called but debug features are 
disabled")
+               return nil
+       }
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       t.startTaskQueue = append(t.startTaskQueue, t.allTaskMap[taskID])
+       return nil
+}
+
+/*
+* @Description: RemoveTask removes the task.
+* @Note: This function will remove the task.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) RemoveTask(taskID int32) error {
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       defer t.Unlock(t.Lock())
+       delete(t.allTaskMap, taskID)
+       // remove from queue
+       for i, task := range t.allTaskQueue {
+               if task.ID == taskID {
+                       t.allTaskQueue = append(t.allTaskQueue[:i], 
t.allTaskQueue[i+1:]...)
+                       break
+               }
+       }
+       delete(t.taskToworkerGroupMap, taskID)
+       delete(t.notCompleteTasks, taskID)
+       return nil
+}
+
+/*
+* @Description: MarkTaskComplete marks the task complete.
+* @Note: This function will mark the task complete.
+* @Param taskID
+* @Return error
+ */
+func (t *SchedulerTaskManager) MarkTaskComplete(taskID int32) error {
+       if _, exists := t.allTaskMap[taskID]; !exists {
+               return errors.New("task not found")
+       }
+       defer t.Unlock(t.Lock())
+       delete(t.notCompleteTasks, taskID)
+       return nil
+}
+
+// update or create a task in the task map
+/*
+* @Description: AssignGroup assigns the group.
+* @Note: This function will assign the group.
+* @Param taskInfo
+* @Return error
+ */
+func (t *SchedulerTaskManager) AssignGroup(taskInfo *structure.TaskInfo) error 
{
+       group := workerMgr.ApplyGroup(taskInfo.SpaceName, taskInfo.GraphName)
+       if group == "" {
+               return errors.New("failed to assign group for task")
+       }
+       t.taskToworkerGroupMap[taskInfo.ID] = group
+       return nil
+}
+
+/*
+* @Description: GetTaskByID gets the task by ID.
+* @Note: This function will get the task by ID.
+* @Param taskID
+* @Return *structure.TaskInfo, error
+ */
+func (t *SchedulerTaskManager) GetTaskByID(taskID int32) (*structure.TaskInfo, 
error) {
+       task, exists := t.allTaskMap[taskID]
+       if !exists {
+               return nil, errors.New("task not found")
+       }
+       return task, nil
+}
+
+/*
+* @Description: GetLastTask gets the last task.
+* @Note: This function will get the last task.
+* @Param spaceName
+* @Return *structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetLastTask(spaceName string) 
*structure.TaskInfo {
+       // Implement logic to get the last task in the queue for the given space
+       if len(t.allTaskQueue) == 0 {
+               return nil
+       }
+       for i := len(t.allTaskQueue) - 1; i >= 0; i-- {
+               if t.allTaskQueue[i].SpaceName == spaceName {
+                       return t.allTaskQueue[i]
+               }
+       }
+       return nil
+}
+
+/*
+* @Description: GetAllTasks gets all tasks.
+* @Note: This function will get all tasks.
+* @Return []*structure.TaskInfo
+ */
+func (t *SchedulerTaskManager) GetAllTasks() []*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+       for _, task := range t.allTaskMap {
+               tasks = append(tasks, task)
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksNotComplete() []*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+       for _, task := range t.notCompleteTasks {
+               tasks = append(tasks, task)
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetAllTasksWaiting() []*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0, len(t.allTaskMap))
+       for _, task := range t.GetAllTasksNotComplete() {
+               if task.State == structure.TaskStateWaiting {
+                       tasks = append(tasks, task)
+               }
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetTasksInQueue(space string) 
[]*structure.TaskInfo {
+       tasks := make([]*structure.TaskInfo, 0)
+       for _, task := range t.GetAllTasksNotComplete() {
+               if task.SpaceName == space {
+                       tasks = append(tasks, task)
+               }
+       }
+       return tasks
+}
+
+// Only for debug or test, get task start sequence
+func (t *SchedulerTaskManager) GetTaskStartSequence(queryTasks []int32) 
[]*structure.TaskInfo {
+       if len(t.startTaskQueue) == 0 {
+               return nil
+       }
+       if len(queryTasks) == 0 {
+               return t.startTaskQueue
+       }
+       tasks := make([]*structure.TaskInfo, 0, len(queryTasks))
+       taskSet := make(map[int32]struct{})
+       for _, id := range queryTasks {
+               taskSet[id] = struct{}{}
+       }
+       for _, task := range t.startTaskQueue {
+               if _, exists := taskSet[task.ID]; exists {
+                       tasks = append(tasks, task)
+               }
+       }
+       logrus.Infof("GetTaskStartSequence: return %d tasks", len(tasks))
+       for _, task := range tasks {
+               logrus.Debugf("TaskID: %d", task.ID)
+       }
+       return tasks
+}
+
+func (t *SchedulerTaskManager) GetTaskToWorkerGroupMap() map[int32]string {
+       // Return a copy of the worker group map to avoid external modifications
+       taskNotComplete := t.GetAllTasksNotComplete()
+       groupMap := make(map[int32]string, len(taskNotComplete))
+       for _, task := range taskNotComplete {
+               if group, exists := t.taskToworkerGroupMap[task.ID]; exists {
+                       groupMap[task.ID] = group
+               }
+       }
+       return groupMap
+}
+
+func (t *SchedulerTaskManager) IsTaskOngoing(taskID int32) bool {
+       // Check if the task is currently ongoing
+       task, exists := t.allTaskMap[taskID]
+       if !exists {
+               return false
+       }
+       return task.State == structure.TaskStateCreated
+}
diff --git a/vermeer/apps/master/services/http_tasks.go 
b/vermeer/apps/master/services/http_tasks.go
index 03130ca8..de4c5eae 100644
--- a/vermeer/apps/master/services/http_tasks.go
+++ b/vermeer/apps/master/services/http_tasks.go
@@ -112,7 +112,7 @@ func handleTaskCreation(ctx *gin.Context, exeFunc 
func(*structure.TaskInfo) erro
        }
 
        filteredTask := taskBiz(ctx).FilteringTask(task)
-       ctx.JSON(http.StatusOK, TaskResp{Task: taskInfo2TaskJson(filteredTask)})
+       ctx.JSON(http.StatusOK, TaskCreateResponse{Task: 
taskInfo2TaskJson(filteredTask)})
 }
 
 type TaskCreateBatchHandler struct {
@@ -231,3 +231,33 @@ func (ch *ComputeValueHandler) GET(ctx *gin.Context) {
 
        ctx.JSON(http.StatusOK, resp)
 }
+
+type TaskStartSequenceHandler struct {
+       common.SenHandler
+}
+
+type TaskStartSequenceRequest struct {
+       QueryTasks []int32 `json:"query_tasks,omitempty"`
+}
+
+type TaskStartSequenceResp struct {
+       common.BaseResp
+       Sequence []int32 `json:"sequence,omitempty"`
+}
+
+func (tsh *TaskStartSequenceHandler) POST(ctx *gin.Context) {
+       req := TaskStartSequenceRequest{}
+       err := ctx.BindJSON(&req)
+       if isBad(err != nil, ctx, func() string { return fmt.Sprintf("request 
body not correct: %s", err) }) {
+               return
+       }
+
+       r := Scheduler.TaskStartSequence(req.QueryTasks)
+
+       sequence := make([]int32, 0, 1)
+       for _, task := range r {
+               sequence = append(sequence, int32(task.ID))
+       }
+
+       ctx.JSON(http.StatusOK, TaskStartSequenceResp{Sequence: sequence, 
BaseResp: common.BaseResp{ErrCode: 0, Message: "ok"}})
+}
diff --git a/vermeer/apps/master/services/router.go 
b/vermeer/apps/master/services/router.go
index e1000d25..ac01e834 100644
--- a/vermeer/apps/master/services/router.go
+++ b/vermeer/apps/master/services/router.go
@@ -35,12 +35,13 @@ func SetRouters(sen *common.Sentinel, authFilters 
...gin.HandlerFunc) {
 
        // /tasks
        regVerAPI(sen, 1, "/tasks", map[string]common.BaseHandler{
-               "":        &TasksHandler{},
-               "/create": &TaskCreateHandler{},
-               // "/create/batch":   &TaskCreateBatchHandler{},
+               "":                &TasksHandler{},
+               "/create":         &TaskCreateHandler{},
+               "/create/batch":   &TaskCreateBatchHandler{},
                "/create/sync":    &TaskCreateSyncHandler{},
                "/oltp":           &OltpHandler{},
                "/value/:task_id": &ComputeValueHandler{},
+               "/start_sequence": &TaskStartSequenceHandler{},
        }, authFilters...)
 
        // /task
diff --git a/vermeer/apps/master/workers/worker_manager.go 
b/vermeer/apps/master/workers/worker_manager.go
index bfaec0b8..f2ea2fb6 100644
--- a/vermeer/apps/master/workers/worker_manager.go
+++ b/vermeer/apps/master/workers/worker_manager.go
@@ -577,6 +577,10 @@ func (wm *workerManager) getGroupWorkers(workerGroup 
string) []*WorkerClient {
        return workers
 }
 
+func (wm *workerManager) GetGroupWorkers(workerGroup string) []*WorkerClient {
+       return wm.getGroupWorkers(workerGroup)
+}
+
 func (wm *workerManager) getGroupWorkerMap(workerGroup string) 
map[string]*WorkerClient {
        workerMap := make(map[string]*WorkerClient)
 
diff --git a/vermeer/apps/structure/task.go b/vermeer/apps/structure/task.go
index 87356f2b..eb000069 100644
--- a/vermeer/apps/structure/task.go
+++ b/vermeer/apps/structure/task.go
@@ -55,6 +55,12 @@ type TaskInfo struct {
        wg               *sync.WaitGroup
        Action           int32
        StatisticsResult map[string]any
+
+       // for scheduler
+       Priority  int32
+       Preorders []int32
+       Exclusive bool   // whether the task can be executed concurrently with 
other tasks
+       CronExpr  string // cron expression for scheduling
 }
 
 func (ti *TaskInfo) SetState(state TaskState) {
diff --git a/vermeer/client/client.go b/vermeer/client/client.go
index d9d1a17f..34553aa3 100644
--- a/vermeer/client/client.go
+++ b/vermeer/client/client.go
@@ -150,6 +150,26 @@ func (vc *VermeerClient) GetWorkers() (*WorkersResponse, 
error) {
        return workersResp, err
 }
 
+func (vc *VermeerClient) AllocGroupGraph(graphName string, groupName string) 
(bool, error) {
+       reader, err := Request2Reader(struct{}{})
+       if err != nil {
+               return false, err
+       }
+       resp, err := 
vc.post(vc.httpAddr+"/admin/workers/alloc/"+groupName+"/$DEFAULT/"+graphName, 
reader)
+       if err != nil {
+               return false, err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               respByte, err := ParseResponse2Byte(resp)
+               if err != nil {
+                       return false, err
+               }
+               return false, fmt.Errorf("response:%s", string(respByte))
+       }
+       return true, nil
+}
+
 func (vc *VermeerClient) GetMaster() (*MasterResponse, error) {
        resp, err := vc.get(vc.httpAddr + "/master")
        if err != nil {
@@ -228,6 +248,31 @@ func (vc *VermeerClient) CreateTaskSync(request 
TaskCreateRequest) (*TaskRespons
        return taskResponse, err
 }
 
+func (vc *VermeerClient) CreateTaskBatch(request TaskCreateBatchRequest) 
(*TaskBatchCreateResponse, error) {
+       reader, err := Request2Reader(request)
+       if err != nil {
+               return nil, err
+       }
+       resp, err := vc.post(vc.httpAddr+"/tasks/create/batch", reader)
+       if err != nil {
+               return nil, err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               respByte, err := ParseResponse2Byte(resp)
+               if err != nil {
+                       return nil, err
+               }
+               return nil, fmt.Errorf("response:%s", string(respByte))
+       }
+       taskResp := &TaskBatchCreateResponse{}
+       err = ParseResponse2Any(resp, taskResp)
+       if err != nil {
+               return nil, err
+       }
+       return taskResp, err
+}
+
 func (vc *VermeerClient) GetTasks() (*TasksResponse, error) {
        resp, err := vc.get(vc.httpAddr + "/tasks")
        if err != nil {
@@ -270,6 +315,31 @@ func (vc *VermeerClient) GetTask(taskID int) 
(*TaskResponse, error) {
        return taskResp, err
 }
 
+func (vc *VermeerClient) GetTaskStartSequence(queryTasks []int32) 
(*TaskStartSequenceResp, error) {
+       reader, err := Request2Reader(TaskStartSequenceRequest{QueryTasks: 
queryTasks})
+       if err != nil {
+               return nil, err
+       }
+       resp, err := vc.post(vc.httpAddr+"/tasks/start_sequence", reader)
+       if err != nil {
+               return nil, err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               respByte, err := ParseResponse2Byte(resp)
+               if err != nil {
+                       return nil, err
+               }
+               return nil, fmt.Errorf("response:%s", string(respByte))
+       }
+       taskResp := &TaskStartSequenceResp{}
+       err = ParseResponse2Any(resp, taskResp)
+       if err != nil {
+               return nil, err
+       }
+       return taskResp, err
+}
+
 func (vc *VermeerClient) GetEdges(graphName string, vertexID string, direction 
string) (*EdgesResponse, error) {
        resp, err := vc.get(vc.httpAddr + "/graphs/" + graphName + 
"/edges?vertex_id=" + vertexID + "&direction=" + direction)
        if err != nil {
diff --git a/vermeer/client/structure.go b/vermeer/client/structure.go
index ceeead39..ee233b42 100644
--- a/vermeer/client/structure.go
+++ b/vermeer/client/structure.go
@@ -38,6 +38,17 @@ type TaskResponse struct {
        Task TaskInfo `json:"task,omitempty"`
 }
 
+type TaskStartSequenceRequest struct {
+       QueryTasks []int32 `json:"query_tasks,omitempty"`
+}
+
+type TaskStartSequenceResp struct {
+       BaseResponse
+       Sequence []int32 `json:"sequence,omitempty"`
+}
+
+type TaskBatchCreateResponse []TaskResponse
+
 type TaskInfo struct {
        ID         int32             `json:"id,omitempty"`
        Status     string            `json:"status,omitempty"`
@@ -161,6 +172,8 @@ type TaskCreateRequest struct {
        Params    map[string]string `json:"params"`
 }
 
+type TaskCreateBatchRequest []TaskCreateRequest
+
 type GraphCreateRequest struct {
        Name string `json:"name,omitempty"`
 }
diff --git a/vermeer/config/master.ini b/vermeer/config/master.ini
index 8a7adb13..34f1859c 100644
--- a/vermeer/config/master.ini
+++ b/vermeer/config/master.ini
@@ -25,3 +25,4 @@ task_parallel_num=1
 auth=none
 auth_token_factor=1234
 start_chan_size=10
+ticker_interval=1
diff --git a/vermeer/config/master.ini b/vermeer/config/worker04.ini
similarity index 85%
copy from vermeer/config/master.ini
copy to vermeer/config/worker04.ini
index 8a7adb13..8b341c8f 100644
--- a/vermeer/config/master.ini
+++ b/vermeer/config/worker04.ini
@@ -16,12 +16,8 @@
 [default]
 log_level=info
 debug_mode=release
-http_peer=0.0.0.0:6688
-grpc_peer=0.0.0.0:6689
+http_peer=0.0.0.0:6988
+grpc_peer=0.0.0.0:6989
 master_peer=127.0.0.1:6689
-run_mode=master
-task_strategy=1
-task_parallel_num=1
-auth=none
-auth_token_factor=1234
-start_chan_size=10
+run_mode=worker
+worker_group=test
\ No newline at end of file
diff --git a/vermeer/docker-compose.yaml b/vermeer/docker-compose.yaml
new file mode 100644
index 00000000..2cf90f6a
--- /dev/null
+++ b/vermeer/docker-compose.yaml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3.8'
+
+services:
+  vermeer-master:
+    image: hugegraph/vermeer
+    container_name: vermeer-master
+    volumes:
+      - ~/:/go/bin/config # Change here to your actual config path
+    command: --env=master
+    networks:
+      vermeer_network:
+        ipv4_address: 172.20.0.10 # Assign a static IP for the master
+
+  vermeer-worker:
+    image: hugegraph/vermeer
+    container_name: vermeer-worker
+    volumes:
+      - ~/:/go/bin/config # Change here to your actual config path
+    command: --env=worker
+    networks:
+      vermeer_network:
+        ipv4_address: 172.20.0.11 # Assign a static IP for the worker
+
+networks:
+  vermeer_network:
+    driver: bridge
+    ipam:
+      config:
+        - subnet: 172.20.0.0/24 # Define the subnet for your network
\ No newline at end of file
diff --git a/vermeer/go.mod b/vermeer/go.mod
index 0ba852b4..7d854284 100644
--- a/vermeer/go.mod
+++ b/vermeer/go.mod
@@ -72,6 +72,7 @@ require (
        github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a 
// indirect
        github.com/prometheus/common v0.32.1 // indirect
        github.com/prometheus/procfs v0.7.3 // indirect
+       github.com/robfig/cron/v3 v3.0.0 // indirect
        github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // 
indirect
        github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
        github.com/ugorji/go/codec v1.2.12 // indirect
diff --git a/vermeer/go.sum b/vermeer/go.sum
index c30b27ba..74351602 100644
--- a/vermeer/go.sum
+++ b/vermeer/go.sum
@@ -395,6 +395,8 @@ github.com/prometheus/procfs v0.1.3/go.mod 
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
 github.com/prometheus/procfs v0.6.0/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/procfs v0.7.3 
h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
 github.com/prometheus/procfs v0.7.3/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/robfig/cron/v3 v3.0.0 
h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
+github.com/robfig/cron/v3 v3.0.0/go.mod 
h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/go-internal v1.3.0/go.mod 
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/russross/blackfriday v1.5.2/go.mod 
h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
 github.com/ryanuber/columnize v2.1.0+incompatible/go.mod 
h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
diff --git a/vermeer/test/functional/compute_base.go 
b/vermeer/test/functional/compute_base.go
index 256c7294..745fd7a5 100644
--- a/vermeer/test/functional/compute_base.go
+++ b/vermeer/test/functional/compute_base.go
@@ -97,6 +97,103 @@ func (ctb *ComputeTaskBase) SendComputeReqAsync(params 
map[string]string) {
        require.Equal(ctb.t, "complete", taskResp.Task.Status)
 }
 
+/*
+* @Description: SendComputeReqAsyncNotWait sends a compute request 
asynchronously and returns the task ID.
+* @Param params
+* @Return int32
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWait(params 
map[string]string) int32 {
+       //create Compute Task
+       resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{
+               TaskType:  "compute",
+               GraphName: ctb.graphName,
+               Params:    params,
+       })
+       require.NoError(ctb.t, err)
+       return int32(resp.Task.ID)
+}
+
+/*
+* @Description: SendComputeReqAsyncNotWaitWithError sends a compute request 
asynchronously and returns the task ID and error.
+* @Param params
+* @Return int32, error
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncNotWaitWithError(params 
map[string]string) (int32, error) {
+       //create Compute Task
+       resp, err := ctb.masterHttp.CreateTaskAsync(client.TaskCreateRequest{
+               TaskType:  "compute",
+               GraphName: ctb.graphName,
+               Params:    params,
+       })
+       if err != nil {
+               return -1, err
+       }
+       return int32(resp.Task.ID), nil
+}
+
+/*
+* @Description: SendComputeReqAsyncBatchPriority sends a compute request 
asynchronously and returns the task ID and sequence.
+* @Note: This function will block the main thread until all tasks are 
completed.
+* @Param params
+* @Return []int32, []int32
+ */
+func (ctb *ComputeTaskBase) SendComputeReqAsyncBatchPriority(params 
[]map[string]string) ([]int32, []int32) {
+       //create Compute Task
+       tasks := make([]client.TaskInfo, 0, len(params))
+       taskIds := make([]int32, 0, len(params))
+       createTasksParams := client.TaskCreateBatchRequest{}
+       for i := 0; i < len(params); i++ {
+               graph := ctb.graphName
+               if params[i]["graph_name"] != "" {
+                       graph = params[i]["graph_name"]
+               }
+               createTasksParams = append(createTasksParams, 
client.TaskCreateRequest{
+                       TaskType:  "compute",
+                       GraphName: graph,
+                       Params:    params[i],
+               })
+       }
+       resp, err := ctb.masterHttp.CreateTaskBatch(createTasksParams)
+       require.NoError(ctb.t, err)
+
+       for i, r := range *resp {
+               if r.BaseResponse.ErrCode != 0 {
+                       ctb.t.Fatalf("create compute task %d failed: %s", i, 
r.BaseResponse.Message)
+               }
+               tasks = append(tasks, r.Task)
+               taskIds = append(taskIds, r.Task.ID)
+       }
+
+       for i := 0; i < len(params); i++ {
+               ctb.taskID = int(tasks[i].ID)
+               //若成功启动Compute Task,开始轮询tasksGet,解析response,得到状态为完成时break。
+               var taskResp *client.TaskResponse
+               var err error
+               for attempt := 0; attempt < ctb.waitSecond; attempt++ {
+                       ctb.healthCheck.DoHealthCheck()
+                       taskResp, err = ctb.masterHttp.GetTask(ctb.taskID)
+                       require.NoError(ctb.t, err)
+                       if taskResp.Task.Status == "complete" {
+                               break
+                       }
+                       require.NotEqual(ctb.t, "error", taskResp.Task.Status)
+                       time.Sleep(1 * time.Second)
+               }
+               require.Equal(ctb.t, "complete", taskResp.Task.Status)
+               fmt.Printf("Compute Task %d completed successfully\n", 
ctb.taskID)
+       }
+
+       response, err := ctb.masterHttp.GetTaskStartSequence(taskIds)
+       require.NoError(ctb.t, err)
+       sequence := response.Sequence
+       for i, id := range sequence {
+               fmt.Printf("Task %d started at position %d in the sequence\n", 
id, i+1)
+       }
+       require.Equal(ctb.t, len(taskIds), len(sequence))
+
+       return taskIds, sequence
+}
+
 // SendComputeReqSync
 //
 //     @Description: 发送Http请求,无需重写,同步请求
diff --git a/vermeer/test/functional/compute_task.go 
b/vermeer/test/functional/compute_task.go
index 14ee5a52..08d65fdc 100644
--- a/vermeer/test/functional/compute_task.go
+++ b/vermeer/test/functional/compute_task.go
@@ -36,6 +36,9 @@ type ComputeTask interface {
                masterHttp *client.VermeerClient, t *testing.T, healthCheck 
*HealthCheck)
        TaskComputeBody() map[string]string
        SendComputeReqAsync(params map[string]string)
+       SendComputeReqAsyncNotWait(params map[string]string) int32
+       SendComputeReqAsyncNotWaitWithError(params map[string]string) (int32, 
error)
+       SendComputeReqAsyncBatchPriority(params []map[string]string) ([]int32, 
[]int32)
        SendComputeReqSync(params map[string]string)
        LoadComputeRes() ([]interface{}, error)
        CheckRes()
diff --git a/vermeer/test/functional/http_interface.go 
b/vermeer/test/functional/http_interface.go
index 2869db14..199261e8 100644
--- a/vermeer/test/functional/http_interface.go
+++ b/vermeer/test/functional/http_interface.go
@@ -76,6 +76,22 @@ func (ct CancelTask) CancelTask(t *testing.T, master 
*client.VermeerClient, grap
        require.Equal(t, "canceled", task.Task.Status)
 }
 
+/*
+* @Description: DirectCancelTask cancels a task directly.
+* @Param t
+* @Param master
+* @Param taskID
+ */
+func (ct CancelTask) DirectCancelTask(t *testing.T, master 
*client.VermeerClient, taskID int32) {
+       ok, err := master.GetTaskCancel(int(taskID))
+       require.NoError(t, err)
+       require.Equal(t, true, ok)
+
+       task, err := master.GetTask(int(taskID))
+       require.NoError(t, err)
+       require.Equal(t, "canceled", task.Task.Status)
+}
+
 type GetGraphs struct {
 }
 
diff --git a/vermeer/test/functional/load_local.go 
b/vermeer/test/functional/load_local.go
index 8da57f04..52575bb1 100644
--- a/vermeer/test/functional/load_local.go
+++ b/vermeer/test/functional/load_local.go
@@ -21,6 +21,9 @@ package functional
 
 import (
        "math/rand"
+       "strconv"
+
+       "github.com/sirupsen/logrus"
 )
 
 type LoadTaskLocal struct {
@@ -43,3 +46,28 @@ func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
                "load.vertex_backend": 
vertexBackends[rand.Intn(len(vertexBackends))],
        }
 }
+
+// TaskLoadBodyWithNum creates load configuration with specified number of 
files.
+// If num <= 10, it will be automatically adjusted to 30 to ensure minimum 
test coverage.
+func (lt *LoadTaskLocal) TaskLoadBodyWithNum(num int) map[string]string {
+       vertexBackends := []string{"db", "mem"}
+
+       if num <= 10 {
+               num = 30
+       }
+
+       logrus.Infof("load with num: " + strconv.Itoa(num-1))
+
+       return map[string]string{
+               "load.parallel":     "100",
+               "load.type":         "local",
+               "load.use_property": "0",
+               //"load.use_outedge":    "1",
+               //"load.use_out_degree": "1",
+               //"load.use_undirected": "0",
+               "load.delimiter":      " ",
+               "load.vertex_files":   "{\"127.0.0.1\":\"" + 
"test/case/vertex/vertex_[0," + strconv.Itoa(num-1) + "]" + "\"}",
+               "load.edge_files":     "{\"127.0.0.1\":\"" + 
"test/case/edge/edge_[0," + strconv.Itoa(num-1) + "]" + "\"}",
+               "load.vertex_backend": 
vertexBackends[rand.Intn(len(vertexBackends))],
+       }
+}
diff --git a/vermeer/test/functional/load_local.go 
b/vermeer/test/scheduler/batch.go
similarity index 52%
copy from vermeer/test/functional/load_local.go
copy to vermeer/test/scheduler/batch.go
index 8da57f04..e2d6611e 100644
--- a/vermeer/test/functional/load_local.go
+++ b/vermeer/test/scheduler/batch.go
@@ -1,5 +1,3 @@
-//go:build vermeer_test
-
 /*
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements. See the NOTICE file distributed with this
@@ -17,29 +15,27 @@ License for the specific language governing permissions and 
limitations
 under the License.
 */
 
-package functional
+package scheduler
 
 import (
-       "math/rand"
+       "testing"
+       "vermeer/client"
+       "vermeer/test/functional"
 )
 
-type LoadTaskLocal struct {
-       LoadTaskTestBase
-}
-
-func (lt *LoadTaskLocal) TaskLoadBody() map[string]string {
-       vertexBackends := []string{"db", "mem"}
-
-       return map[string]string{
-               "load.parallel":     "100",
-               "load.type":         "local",
-               "load.use_property": "0",
-               //"load.use_outedge":    "1",
-               //"load.use_out_degree": "1",
-               //"load.use_undirected": "0",
-               "load.delimiter":      " ",
-               "load.vertex_files":   "{\"127.0.0.1\":\"" + 
"test/case/vertex/vertex_[0,29]" + "\"}",
-               "load.edge_files":     "{\"127.0.0.1\":\"" + 
"test/case/edge/edge_[0,29]" + "\"}",
-               "load.vertex_backend": 
vertexBackends[rand.Intn(len(vertexBackends))],
-       }
+/*
+* @Description: This is the main test function for batch.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param factor
+* @Param waitSecond
+ */
+func TestBatch(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
factor string, waitSecond int) {
+       // TEST GROUP: BATCH
+       // 1. send batch tasks to single graph
+       // expect: the tasks should be executed in order of time
+       // have been tested in priority.go
 }
diff --git a/vermeer/test/scheduler/priority.go 
b/vermeer/test/scheduler/priority.go
new file mode 100644
index 00000000..f15da6c9
--- /dev/null
+++ b/vermeer/test/scheduler/priority.go
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+       "fmt"
+       "sync"
+       "testing"
+       "time"
+
+       "vermeer/apps/structure"
+       "vermeer/client"
+       "vermeer/test/functional"
+
+       "github.com/sirupsen/logrus"
+       "github.com/stretchr/testify/require"
+)
+
+/*
+* @Description: SubTestPriority tests the scheduler's behavior when submitting 
tasks with different priorities.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestPriority(t *testing.T, expectRes *functional.ExpectRes, 
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, 
graphName []string, computeTask string, waitSecond int) {
+       fmt.Printf("Test Priority start with task: %s\n", computeTask)
+       bTime := time.Now()
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBody := computeTest.TaskComputeBody()
+
+       // send two tasks with different priority
+       params := make([]map[string]string, 0)
+
+       for i := 0; i < 2; i++ {
+               param := make(map[string]string)
+               param["priority"] = fmt.Sprintf("%d", i)
+               for k, v := range taskComputeBody {
+                       param[k] = v
+               }
+               params = append(params, param)
+       }
+
+       logrus.Infof("params for priority test: %+v", params)
+
+       taskids, sequence := 
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests 
asynchronously with priority
+
+       require.Equal(t, 2, len(sequence))
+       for i := 0; i < 2; i++ {
+               require.Equal(t, taskids[1-i], sequence[i]) // expect task with 
priority 1 executed before priority 0
+       }
+
+       computeTest.CheckRes()
+       fmt.Printf("Test Priority: %-30s [OK], cost: %v\n", computeTask, 
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestSmall tests the scheduler's behavior when submitting 
tasks with different sizes.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestSmall(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
computeTask string, waitSecond int) {
+       fmt.Printf("Test Small start with task: %s\n", computeTask)
+       bTime := time.Now()
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       computeTaskSmall, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBody := computeTest.TaskComputeBody()
+       computeTaskSmall.Init(graphName[1], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBodySmall := computeTaskSmall.TaskComputeBody()
+
+       // send two tasks with different size
+       params := make([]map[string]string, 0)
+       taskComputeBody["graph_name"] = graphName[0]
+       taskComputeBodySmall["graph_name"] = graphName[1]
+       params = append(params, taskComputeBody)
+       params = append(params, taskComputeBodySmall)
+
+       logrus.Infof("params for small test: %+v", params)
+
+       taskids, sequence := 
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests 
asynchronously with priority
+
+       require.Equal(t, 2, len(sequence))
+       for i := 0; i < 2; i++ {
+               require.Equal(t, taskids[1-i], sequence[i]) // expect task 
smaller executed before larger
+       }
+
+       computeTest.CheckRes()
+       fmt.Printf("Test Small: %-30s [OK], cost: %v\n", computeTask, 
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestConcurrent tests the scheduler's behavior when 
submitting tasks with different sizes.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestConcurrent(t *testing.T, expectRes *functional.ExpectRes, 
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, 
graphName []string, computeTask string, waitSecond int) {
+       fmt.Printf("Test Concurrent start with task: %s\n", computeTask)
+       bTime := time.Now()
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[1], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBody := computeTest.TaskComputeBody()
+
+       // send two tasks with different size
+       params := make([]map[string]string, 0)
+       // default is false, actually do not need to set
+       taskComputeBody["exclusive"] = "false"
+       params = append(params, taskComputeBody)
+       params = append(params, taskComputeBody)
+
+       logrus.Infof("params for concurrent test: %+v", params)
+
+       _, sequence := computeTest.SendComputeReqAsyncBatchPriority(params) // 
send multiple requests asynchronously with priority
+
+       require.Equal(t, 2, len(sequence))
+
+       fmt.Printf("Test Concurrent: %-30s [OK], cost: %v\n", computeTask, 
time.Since(bTime))
+       // cost should be less than 2 * single task time
+}
+
+/*
+* @Description: SubTestDepends tests the scheduler's behavior when submitting 
tasks with different dependencies.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestDepends(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
computeTask string, waitSecond int) {
+       fmt.Printf("Test Depends start with task: %s\n", computeTask)
+       bTime := time.Now()
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBody := computeTest.TaskComputeBody()
+
+       // first alloc worker 4 for graph 3
+       masterHttp.AllocGroupGraph(graphName[0]+"_3", "test")
+
+       loadTest3 := functional.LoadTaskLocal{}
+       loadTest3.Init(graphName[0]+"_3", expectRes, masterHttp, waitSecond, t, 
healthCheck)
+       loadTest3.SendLoadRequest(loadTest3.TaskLoadBodyWithNum(10))
+
+       // send a large task to $ worker group
+       taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody)
+
+       // send two tasks with different dependency to the same graph
+       taskComputeBody["graph_name"] = graphName[0] + "_3"
+       params := make([]map[string]string, 0)
+       new_body := make(map[string]string)
+       for k, v := range taskComputeBody {
+               new_body[k] = v
+       }
+       new_body["preorders"] = fmt.Sprintf("%d", taskid)
+       params = append(params, new_body)
+       params = append(params, taskComputeBody)
+
+       logrus.Infof("params for depends test: %+v", params)
+
+       taskids, sequence := 
computeTest.SendComputeReqAsyncBatchPriority(params) // send multiple requests 
asynchronously with priority
+
+       require.Equal(t, 2, len(sequence))
+       for i := 0; i < 2; i++ {
+               require.Equal(t, taskids[1-i], sequence[i]) // expect task not 
depend executed first
+       }
+
+       // computeTest.CheckRes()
+       fmt.Printf("Test Depends: %-30s [OK], cost: %v\n", computeTask, 
time.Since(bTime))
+}
+
+/*
+* @Description: SubTestInvalidDependency tests the scheduler's behavior when a 
compute task is submitted with a dependency on a non-existent (invalid) task ID.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestInvalidDependency(t *testing.T, expectRes *functional.ExpectRes, 
healthCheck *functional.HealthCheck, masterHttp *client.VermeerClient, 
graphName []string, computeTask string, waitSecond int) {
+       fmt.Printf("Test Invalid Dependency start with task: %s\n", computeTask)
+       bTime := time.Now()
+
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+
+       taskBody := computeTest.TaskComputeBody()
+       // set preorders to a very large, theoretically nonexistent task ID
+       invalidTaskID := 999999999
+       taskBody["preorders"] = fmt.Sprintf("%d", invalidTaskID)
+
+       logrus.Infof("Attempting to submit a task with invalid dependency on 
ID: %d", invalidTaskID)
+
+       // try to submit task asynchronously and check if it returns an error
+       taskID, err := computeTest.SendComputeReqAsyncNotWaitWithError(taskBody)
+
+       // assert that the submission operation failed
+       require.Error(t, err, "Submitting a task with a non-existent dependency 
should return an error.")
+       // assert that the returned task ID is 0 or other failed values
+       require.Equal(t, int32(-1), taskID, "The task ID should be zero or 
invalid on failure.")
+
+       fmt.Printf("Test Invalid Dependency: %-30s [OK], cost: %v\n", 
computeTask, time.Since(bTime))
+}
+
+/*
+* @Description: SubTestConcurrentCancellation tests the scheduler's behavior 
when submitting tasks concurrently and canceling them.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestConcurrentCancellation(t *testing.T, expectRes 
*functional.ExpectRes, healthCheck *functional.HealthCheck, masterHttp 
*client.VermeerClient, graphName []string, computeTask string, waitSecond int) {
+       fmt.Printf("Test Concurrent Cancellation start with task: %s\n", 
computeTask)
+       bTime := time.Now()
+
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+
+       // set task number
+       const numTasks = 20
+       taskBodies := make([]map[string]string, numTasks)
+       for i := 0; i < numTasks; i++ {
+               taskBodies[i] = computeTest.TaskComputeBody()
+       }
+
+       taskIDs := make(chan int32, numTasks)
+       var wg sync.WaitGroup
+
+       // 1. submit tasks concurrently
+       for i := 0; i < numTasks; i++ {
+               wg.Add(1)
+               go func(body map[string]string) {
+                       defer wg.Done()
+                       taskID := computeTest.SendComputeReqAsyncNotWait(body)
+                       if taskID != 0 {
+                               taskIDs <- taskID
+                       } else {
+                               logrus.Errorf("Failed to submit task: %v", err)
+                       }
+               }(taskBodies[i])
+       }
+
+       wg.Wait()
+       close(taskIDs)
+
+       submittedTaskIDs := make([]int32, 0, numTasks)
+       for id := range taskIDs {
+               submittedTaskIDs = append(submittedTaskIDs, id)
+       }
+
+       logrus.Infof("Submitted %d tasks concurrently: %+v", 
len(submittedTaskIDs), submittedTaskIDs)
+       require.Equal(t, numTasks, len(submittedTaskIDs), "Not all tasks were 
successfully submitted.")
+
+       cancelTask := functional.CancelTask{}
+       cancelTask.DirectCancelTask(t, masterHttp, 
submittedTaskIDs[len(submittedTaskIDs)-1])
+
+       // 3. verify task status
+       // wait for tasks to settle
+       logrus.Info("Waiting for tasks to settle...")
+       time.Sleep(time.Duration(waitSecond) * time.Second)
+
+       checkTask, err := masterHttp.GetTask(int(submittedTaskIDs[numTasks-1]))
+
+       require.NoError(t, err, "Error fetching task status after 
cancellation.")
+       require.NotNil(t, checkTask, "Task should exist after cancellation.")
+
+       if structure.TaskState(checkTask.Task.Status) != 
structure.TaskStateCanceled {
+               logrus.Warn("No tasks were cancelled; check scheduler 
behavior.")
+               require.Fail(t, "Expected at least some tasks to be cancelled.")
+       }
+
+       fmt.Printf("Test Concurrent Cancellation: %-30s [OK], cost: %v\n", 
computeTask, time.Since(bTime))
+}
+
+/*
+* @Description: This is the main test function for priority.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func TestPriority(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
factor string, waitSecond int) {
+       fmt.Print("start test priority\n")
+
+       // for scheduler, just test a simple task
+       var computeTask = "pagerank"
+
+       // TEST GROUP: PRIORITY
+       // 1. send priority tasks to single graph
+       // expect: the tasks should be executed in order of priority
+
+       SubTestPriority(t, expectRes, healthCheck, masterHttp, graphName, 
computeTask, waitSecond)
+
+       // 2. send small tasks and large tasks to single graph
+       // expect: the small tasks should be executed first
+
+       SubTestSmall(t, expectRes, healthCheck, masterHttp, graphName, 
computeTask, waitSecond)
+
+       // 3. send support concurrent tasks to single graph
+       // expect: the tasks should be executed concurrently
+       SubTestConcurrent(t, expectRes, healthCheck, masterHttp, graphName, 
computeTask, waitSecond)
+
+       // 4. send dependency-tasks to single graph
+       // expect: the tasks should be executed in order of dependency
+
+       SubTestDepends(t, expectRes, healthCheck, masterHttp, graphName, 
computeTask, waitSecond)
+
+       // 5. send same priority tasks to single graph
+       // expect: the tasks should be executed in order of time
+       // skipped, too fragile
+
+       // 6. send tasks to different graphs
+       // expect: the tasks should be executed concurrently
+       // have been tested in SubTestSmall and SubTestDepends
+
+       // 7. send tasks with invalid dependency to single graph
+       // expect: the tasks should not be executed
+       SubTestInvalidDependency(t, expectRes, healthCheck, masterHttp, 
graphName, computeTask, waitSecond)
+
+       // 8. send tasks concurrently and cancel them
+       // expect: the tasks should be cancelled
+       SubTestConcurrentCancellation(t, expectRes, healthCheck, masterHttp, 
graphName, computeTask, 3)
+}
diff --git a/vermeer/test/scheduler/routine.go 
b/vermeer/test/scheduler/routine.go
new file mode 100644
index 00000000..e5ced6bb
--- /dev/null
+++ b/vermeer/test/scheduler/routine.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+       "fmt"
+       "testing"
+       "time"
+       "vermeer/client"
+       "vermeer/test/functional"
+
+       "github.com/sirupsen/logrus"
+       "github.com/stretchr/testify/require"
+)
+
+/*
+* @Description: SubTestRoutine tests the scheduler's behavior when submitting 
tasks with cron expression.
+* @Param t
+* @Param expectRes
+* @Param healthCheck
+* @Param masterHttp
+* @Param graphName
+* @Param computeTask
+* @Param waitSecond
+ */
+func SubTestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
computeTask string, waitSecond int) {
+       fmt.Printf("Test Routine start with task: %s\n", computeTask)
+       bTime := time.Now()
+       computeTest, err := functional.MakeComputeTask(computeTask)
+       require.NoError(t, err)
+       computeTest.Init(graphName[0], computeTask, expectRes, waitSecond, 
masterHttp, t, healthCheck)
+       taskComputeBody := computeTest.TaskComputeBody()
+
+       // every 1 minute
+       taskComputeBody["cron_expr"] = "* * * * *"
+
+       logrus.Infof("params for routine test: %+v", taskComputeBody)
+
+       taskid := computeTest.SendComputeReqAsyncNotWait(taskComputeBody)
+       // computeTest.CheckRes()
+
+       // wait for a while and check again
+       time.Sleep(2 * time.Minute)
+
+       // check if deployed
+       queue := []int32{}
+       queue = append(queue, int32(taskid+1))
+       result, err := masterHttp.GetTaskStartSequence(queue)
+       require.NoError(t, err)
+       require.Equal(t, 1, len(result.Sequence))
+       require.Greater(t, result.Sequence[0], int32(0))
+
+       masterHttp.GetTaskCancel(int(taskid))
+
+       fmt.Printf("Test Routine: %-30s [OK], cost: %v\n", computeTask, 
time.Since(bTime))
+}
+
+func TestRoutine(t *testing.T, expectRes *functional.ExpectRes, healthCheck 
*functional.HealthCheck, masterHttp *client.VermeerClient, graphName []string, 
factor string, waitSecond int) {
+       var computeTask = "pagerank"
+
+       // TEST GROUP: ROUTINE
+       // 1. send tasks to single graph
+       // expect: the tasks should be executed timely
+
+       SubTestRoutine(t, expectRes, healthCheck, masterHttp, graphName, 
computeTask, waitSecond)
+}
diff --git a/vermeer/test/scheduler/test_scheduler.go 
b/vermeer/test/scheduler/test_scheduler.go
new file mode 100644
index 00000000..7d0274cb
--- /dev/null
+++ b/vermeer/test/scheduler/test_scheduler.go
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with this
+work for additional information regarding copyright ownership. The ASF
+licenses this file to You under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+*/
+
+package scheduler
+
+import (
+       "fmt"
+       "net/http"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       "vermeer/client"
+       "vermeer/test/functional"
+)
+
+/*
+* @Description: This is the main test function for scheduler.
+* @Param t
+* @Param expectResPath
+* @Param masterHttpAddr
+* @Param graphName
+* @Param factor
+* @Param waitSecond
+* @Note: You must start at least two worker, named worker01 and worker04 in 
your config.yaml
+ */
+func TestScheduler(t *testing.T, expectResPath string, masterHttpAddr string, 
graphName string, factor string, waitSecond int) {
+       fmt.Print("start test scheduler\n")
+
+       startTime := time.Now()
+       expectRes, err := functional.GetExpectRes(expectResPath)
+       require.NoError(t, err)
+
+       masterHttp := client.VermeerClient{}
+       masterHttp.Init("http://"+masterHttpAddr, http.DefaultClient)
+
+       // health check
+       healthCheck := functional.HealthCheck{}
+       healthCheck.Init(t, &masterHttp)
+       healthCheck.DoHealthCheck()
+
+       // load graph first
+       loadTest1 := functional.LoadTaskLocal{}
+       loadTest1.Init(graphName+"_1", expectRes, &masterHttp, waitSecond, t, 
&healthCheck)
+       loadTest1.SendLoadRequest(loadTest1.TaskLoadBodyWithNum(0))
+       loadTest1.CheckGraph()
+
+       loadTest2 := functional.LoadTaskLocal{}
+       loadTest2.Init(graphName+"_2", expectRes, &masterHttp, waitSecond, t, 
&healthCheck)
+       loadTest2.SendLoadRequest(loadTest2.TaskLoadBodyWithNum(20))
+       // loadTest2.CheckGraph()
+
+       TestPriority(t, expectRes, &healthCheck, &masterHttp, 
[]string{graphName + "_1", graphName + "_2"}, factor, waitSecond)
+
+       TestBatch(t, expectRes, &healthCheck, &masterHttp, []string{graphName + 
"_1"}, factor, waitSecond)
+
+       TestRoutine(t, expectRes, &healthCheck, &masterHttp, []string{graphName 
+ "_2"}, factor, waitSecond)
+
+       // Error handling: cancel task
+       cancelTask := functional.CancelTask{}
+       cancelTask.CancelTask(t, &masterHttp, graphName+"_1")
+       cancelTask.CancelTask(t, &masterHttp, graphName+"_2")
+       fmt.Print("test cancel task [OK]\n")
+
+       // Finally, delete graph
+       deleteGraph := functional.DeleteGraph{}
+       deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_1")
+       deleteGraph.DeleteGraph(t, &masterHttp, graphName+"_2")
+       fmt.Print("test delete graph [OK]\n")
+
+       fmt.Printf("client test finished, cost time:%v\n", 
time.Since(startTime))
+}
diff --git a/vermeer/vermeer_test.go b/vermeer/vermeer_test.go
index 4dde004d..fcc93272 100644
--- a/vermeer/vermeer_test.go
+++ b/vermeer/vermeer_test.go
@@ -31,6 +31,7 @@ import (
 
        "vermeer/client"
        "vermeer/test/functional"
+       "vermeer/test/scheduler"
 )
 
 var (
@@ -95,6 +96,8 @@ func TestVermeer(t *testing.T) {
                t.Run("algorithms", testAlgorithms)
        case "function":
                t.Run("function", testFunction)
+       case "scheduler":
+               t.Run("scheduler", testScheduler)
        }
 }
 
@@ -102,10 +105,15 @@ func testFunction(t *testing.T) {
        functional.TestFunction(t, expectResPath, masterHttpAddr, graphName, 
factor, waitSecond)
 }
 
+func testScheduler(t *testing.T) {
+       scheduler.TestScheduler(t, expectResPath, masterHttpAddr, graphName, 
factor, waitSecond)
+}
+
 func testAlgorithms(t *testing.T) {
        // todo: 增加算法名称
        var computeTasks = []string{"pagerank", "lpa", "wcc", "degree_out", 
"degree_in", "degree_both", "triangle_count",
                "sssp", "closeness_centrality", "betweenness_centrality", 
"kcore", "jaccard", "ppr", "clustering_coefficient", "scc", "louvain"}
+       // var computeTasks = []string{"pagerank"}
 
        startTime := time.Now()
        expectRes, err := functional.GetExpectRes(expectResPath)
@@ -158,6 +166,7 @@ func testAlgorithms(t *testing.T) {
                taskComputeBody["output.need_query"] = needQuery
                if sendType == "async" {
                        computeTest.SendComputeReqAsync(taskComputeBody)
+                       // computeTest.SendComputeReqAsyncBatchPriority(10, 
taskComputeBody) // 异步发送多个请求
                } else {
                        computeTest.SendComputeReqSync(taskComputeBody)
                }

Reply via email to