Hi Nemo devs,

I'm thinking about reorganizing the scheduler code as following to resolve
NEMO-50 (Carefully retry tasks in the scheduler) (
https://issues.apache.org/jira/projects/NEMO/issues/NEMO-50),
The main problems this solves are restarting unnecessary tasks[ref 1], and
not restarting tasks that need to be restarted[ref 2].
Please let me know if you have any comments.

Meanwhile I'll start writing code and unit tests to check if this approach
makes sense.
After this issue, I'll take NEMO-54/55 (Data/Control protocol fault
handling), after which we should be able to run fault-injected integration
tests using workloads like ALS-Pado.

Cheers,
John

[ref 1]
https://github.com/apache/incubator-nemo/blob/master/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java#L195
[ref 2]
https://github.com/apache/incubator-nemo/blob/master/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java#L556

[Goals]

- Restart a minimum number of tasks that need to be restarted

- Use a minimum number of states

- Single-threaded, single-place state transition handling (i.e., 'write'
access limited to a single thread)

- Use a simple queue, and replace
PendingTaskCollection.RemoveTasksAndDescendants
with an input block availability checker


[State machines]

- BlockState: NOT_AVAILABLE (initial), IN_PROGRESS (‘push’ receivers wait
on this), AVAILABLE

- TaskState: READY (initial), EXECUTING, COMPLETE, FAILED_UNRECOVERABLE,
ON_HOLD

- StageState: READY (initial), EXECUTING, COMPLETE


[BatchSingleJobScheduler: Key methods]


(1) doSchedule()

Find the min-num scheduling group that has an EXECUTING/READY stage

Make appropriate group-internal blocks IN_PROGRESS

In a reverse-topological order, scheduleStage(stage)


(2) scheduleStage(stage)

For each READY task,

- BlockState: IN_PROGRESS

- TaskState: EXECUTING

- StageState: EXECUTING

Enqueue the tasks to the pending queue (a different thread will dequeue,
check block availability, and schedule to an executor)


(3) taskCompleted(task)

- BlockState: AVAILABLE

- TaskState: COMPLETE

if all tasks of a stage complete,

- StageState: COMPLETE

- doSchedule()


(4) recursivelySetParentTasksToRestart(task)

recursively find parent tasks whose blocks are NOT_AVAILABLE, and make the
tasks/stages READY if COMPLETE (taskAttempt++)


[Failure handling]


(1) EXECUTOR_LOST (this happens due to things like resource eviction)

- Set all local stored blocks as NOT_AVAILABLE

- Set all local running tasks as READY (taskAttempt++)

- For each local task, recursivelySetParentTasksToRestart(task)

- doSchedule()


(2) Task INPUT_READ_FAILURE (this happens for example when the executor
storing the input block becomes EXECUTOR_LOST)

- Set failed block as NOT_AVAILABLE

- Set failed task as READY (taskAttempt++)

- For the failed task, recursivelySetParentTasksToRestart(task)

- doSchedule()

Reply via email to