[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187039736
 
 

 ##
 File path: 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
 ##
 @@ -22,44 +22,49 @@
 import java.io.FileOutputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.List;
 
 /**
  * Collect transform.
  * @param  type of data to collect.
  */
 public final class CollectTransform implements Transform {
   private String filename;
+  private FileOutputStream fos;
 
 Review comment:
   Why do we need to have these `fos` and `oos` as variables? It can be created 
only in `close()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187041677
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187039938
 
 

 ##
 File path: 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
 ##
 @@ -22,44 +22,49 @@
 import java.io.FileOutputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.List;
 
 /**
  * Collect transform.
  * @param  type of data to collect.
  */
 public final class CollectTransform implements Transform {
   private String filename;
+  private FileOutputStream fos;
+  private ObjectOutputStream oos;
+  private final List list;
 
   /**
* Constructor.
+   *
* @param filename file to keep the result in.
*/
   public CollectTransform(final String filename) {
 this.filename = filename;
+this.list = new ArrayList<>();
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector 
outputCollector) {
+  public void prepare(final Context context, final OutputCollector oc) {
 this.filename = filename + JavaRDD.getResultId();
   }
 
   @Override
-  public void onData(final Iterator elements, final String srcVertexId) {
+  public void onData(final T element) {
 // Write result to a temporary file.
 // TODO #740: remove this part, and make it properly transfer with 
executor.
-try (final FileOutputStream fos = new FileOutputStream(filename)) {
-  try (final ObjectOutputStream oos = new ObjectOutputStream(fos)) {
-final ArrayList list = new ArrayList<>();
-elements.forEachRemaining(list::add);
-oos.writeObject(list);
-  }
-} catch (Exception e) {
-  throw new RuntimeException(e);
-}
+list.add(element);
   }
 
   @Override
   public void close() {
+try {
 
 Review comment:
   Please use try-with-resource clause for these `Stream`s.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187038660
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -85,15 +83,16 @@ public RuntimeEdge getSideInputRuntimeEdge() {
 return sideInputRuntimeEdge;
   }
 
+  /**
+   * Set this OutputCollector as having side input for the given child task.
+   *
+   * @param physicalTaskId the id of child task whose side input will be put 
into this OutputCollector.
+   */
   public void setAsSideInputFor(final String physicalTaskId) {
 sideInputReceivers.add(physicalTaskId);
   }
 
   public boolean hasSideInputFor(final String physicalTaskId) {
 
 Review comment:
   Comment please :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187041521
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187041511
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-05-09 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187041188
 
 

 ##
 File path: 
examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/DataSkewPolicyParallelismFive.java
 ##
 @@ -25,10 +25,10 @@
 /**
  * A data-skew policy with fixed parallelism 5 for tests.
  */
-public final class DataSkewPolicyParallelsimFive implements Policy {
+public final class DataSkewPolicyParallelismFive implements Policy{
 
 Review comment:
   Please revert the space between `Policy` and bracket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277618
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277644
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277624
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277561
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277614
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277425
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,14 +52,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
+  private final List inputReaders;
+  private final Map 
inputReaderToDataHandlersMap;
+  private final Map idToSrcIteratorMap;
+  private final Map 
srcIteratorIdToDataHandlersMap;
+  private final Map iteratorIdToDataHandlersMap;
+  private final LinkedBlockingQueue> partitionQueue;
+  private List taskDataHandlers;
 
 Review comment:
   Do we have to maintain this list and `srcIteratorIdToDataHandlersMap`, 
`srcIteratorIdToDataHandlersMap` at the same time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277348
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/RelayTransform.java
 ##
 @@ -35,13 +33,13 @@ public RelayTransform() {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector oc) {
-this.outputCollector = oc;
+  public void prepare(final Context context, final OutputCollector p) {
 
 Review comment:
   Let's change `p` to `oc` again. (for all other changes also)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277628
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277650
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -46,11 +61,39 @@ public void emit(final String dstVertexId, final Object 
output) {
   }
 
   /**
-   * Collects the accumulated output and replace the output list.
+   * Inter-Task data is transferred from sender-side Task's 
OutputCollectorImpl to receiver-side Task.
*
-   * @return the list of output elements.
+   * @return the first element of this list
*/
-  public List collectOutputList() {
-return outputList.getAndSet(new ArrayList<>());
+  public O remove() {
 
 Review comment:
   Please check this comment again


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277564
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-17 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r175277526
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  throw new UnsupportedOperationException(task.toString());
-}
-  } catch (final BlockFetchException ex) {
-taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173050492
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,27 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
+  private final List inputReaders;
 
 Review comment:
   Why do we have to maintain the list and map of `InputReader`s at the same 
time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173052570
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +196,467 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task) {
+final TaskDataHandler dataHandler = taskToDataHandlerMap.get(task);
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
taskToDataHandlerMap.get(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+  LOG.info("log: Added Output outputCollector of {} as InputPipe of {} 
{}",
+  getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task) {
+final TaskDataHandler dataHandler = taskToDataHandlerMap.get(task);
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173051102
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,27 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
+  private final List inputReaders;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> partitionQueue;
+  private Map outputCollectorToDstTasksMap;
+  private final Set finishedTaskIds;
+  private int numPartitions;
+  private Map logicalTaskIdToReadable;
+  private Map taskToDataHandlerMap;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
 
   private boolean isExecutionRequested;
+  private String logicalTaskIdPutOnHold;
+
+  private static final String ITERATORID_PREFIX = "ITERATOR_";
+  private static final AtomicInteger ITERATORID_GENERATOR = new 
AtomicInteger(0);
 
   public TaskGroupExecutor(final ScheduledTaskGroup scheduledTaskGroup,
 
 Review comment:
   Please add comments about the constructor and all methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173049922
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.datatransfer;
+
+import edu.snu.nemo.runtime.common.plan.physical.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Per-Task data handler.
+ */
+public final class TaskDataHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskDataHandler.class.getName());
+
+  public TaskDataHandler(final Task task) {
+this.task = task;
+this.outputCollector = null;
+this.outputWriters = new ArrayList<>();
+this.inputFromThisStage = new ArrayList<>();
+this.sideInputFromOtherStages = new ArrayList<>();
+this.sideInputFromThisStage = new ArrayList<>();
+  }
+
+  private final Task task;
+  private OutputCollectorImpl outputCollector;
 
 Review comment:
   `Nullable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173052642
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +196,467 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task) {
+final TaskDataHandler dataHandler = taskToDataHandlerMap.get(task);
+List parentTasks = taskGroupDag.getParents(task.getId());
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final OutputCollectorImpl parentOutputCollector = 
taskToDataHandlerMap.get(parent).getOutputCollector();
+if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+  dataHandler.addSideInputFromThisStage(parentOutputCollector);
+} else {
+  dataHandler.addInputFromThisStages(parentOutputCollector);
+  LOG.info("log: Added Output outputCollector of {} as InputPipe of {} 
{}",
+  getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+}
+  });
+}
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
*/
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
-
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task) {
+final TaskDataHandler dataHandler = taskToDataHandlerMap.get(task);
+final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof MetricCollectionBarrierTask) {
-  launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else {
-  

[GitHub] sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-07 Thread GitBox
sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r173049096
 
 

 ##
 File path: 
examples/beam/src/main/java/edu/snu/nemo/examples/beam/AlternatingLeastSquare.java
 ##
 @@ -270,7 +269,8 @@ public void processElement(final ProcessContext c) throws 
Exception {
  */
 @FinishBundle
 public void finishBundle(final FinishBundleContext c) {
-  results.forEach(r -> c.output(r, null, null));
+  KV resultElement = results.remove(0);
 
 Review comment:
   Is this an expected behavior?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services