sanha commented on a change in pull request #1: [NEMO-26] Implement
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172728122
##########
File path:
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
##########
@@ -526,8 +403,133 @@ private PhysicalStage getStageById(final String stageId)
{
throw new RuntimeException(new Throwable("This taskGroupId does not exist
in the plan"));
}
- @Override
- public void terminate() {
- // nothing to do yet.
+ /**
+ * Action after task group execution has been completed, not after it has
been put on hold.
+ *
+ * @param executorId the ID of the executor.
+ * @param taskGroupId the ID pf the task group completed.
+ */
+ private void onTaskGroupExecutionComplete(final String executorId,
+ final String taskGroupId) {
+ onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+ }
+
+ /**
+ * Action after task group execution has been completed.
+ * @param executorId id of the executor.
+ * @param taskGroupId the ID of the task group completed.
+ * @param isOnHoldToComplete whether or not if it is switched to complete
after it has been on hold.
+ */
+ private void onTaskGroupExecutionComplete(final String executorId,
+ final String taskGroupId,
+ final Boolean isOnHoldToComplete) {
+ LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+ if (!isOnHoldToComplete) {
+ schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+ }
+
+ final String stageIdForTaskGroupUponCompletion =
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+ if
(jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
+ // if the stage this task group belongs to is complete,
+ if (!jobStateManager.checkJobTermination()) { // and if the job is not
yet complete or failed,
+ scheduleNextStage(stageIdForTaskGroupUponCompletion);
+ }
+ }
+ }
+
+ /**
+ * Action for after task group execution is put on hold.
+ * @param executorId the ID of the executor.
+ * @param taskGroupId the ID of the task group.
+ * @param taskPutOnHold the ID of task that is put on hold.
+ */
+ private void onTaskGroupExecutionOnHold(final String executorId,
+ final String taskGroupId,
+ final String taskPutOnHold) {
+ LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
+ schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+ final String stageIdForTaskGroupUponCompletion =
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+
+ final boolean stageComplete =
+
jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+
+ if (stageComplete) {
+ // get optimization vertex from the task.
+ final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
+ getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks
list
+ .filter(task -> task.getId().equals(taskPutOnHold)) // find it
+ .map(physicalPlan::getIRVertexOf) // get the corresponding
IRVertex, the MetricCollectionBarrierVertex
+ .filter(irVertex -> irVertex instanceof
MetricCollectionBarrierVertex)
+ .distinct()
+ .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) //
convert types
+ .findFirst().orElseThrow(() -> new
RuntimeException(ON_HOLD.name() // get it
+ + " called with failed task ids by some other task than "
+ + MetricCollectionBarrierTask.class.getSimpleName()));
+ // and we will use this vertex to perform metric collection and dynamic
optimization.
+
+ pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
+ new DynamicOptimizationEvent(physicalPlan,
metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+ } else {
+ onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+ }
+ }
+
+ private void onTaskGroupExecutionFailedRecoverable(final String executorId,
final String taskGroupId,
Review comment:
Please add a comment about this method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services