sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187041511
 
 

 ##########
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##########
 @@ -145,303 +212,407 @@ private void initializeDataTransfer() {
         .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-    addInputReader(task, inputReader);
-  }
-
-  private void createLocalWriter(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-    addOutputWriter(task, outputWriter);
-  }
-
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add input OutputCollectors to each {@link Task}.
+   * Input OutputCollector denotes all the OutputCollectors of intra-Stage 
parent tasks of this task.
+   *
+   * @param task the Task to add input OutputCollectors to.
+   */
+  private void addInputFromThisStage(final Task task, final TaskDataHandler 
dataHandler) {
+    List<Task> parentTasks = taskGroupDag.getParents(task.getId());
     final String physicalTaskId = getPhysicalTaskId(task.getId());
-    physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-    physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-    physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-    physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+    if (parentTasks != null) {
+      parentTasks.forEach(parent -> {
+        final OutputCollectorImpl parentOutputCollector = 
getTaskDataHandler(parent).getOutputCollector();
+        if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+          dataHandler.addSideInputFromThisStage(parentOutputCollector);
+        } else {
+          dataHandler.addInputFromThisStages(parentOutputCollector);
+        }
+      });
+    }
   }
 
   /**
-   * Executes the task group.
+   * Add output outputCollectors to each {@link Task}.
+   * Output outputCollector denotes the one and only one outputCollector of 
this task.
+   * Check the outgoing edges that will use this outputCollector,
+   * and set this outputCollector as side input if any one of the edges uses 
this outputCollector as side input.
+   *
+   * @param task the Task to add output outputCollectors to.
    */
-  public void execute() {
-    LOG.info("{} Execution Started!", taskGroupId);
-    if (isExecutionRequested) {
-      throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-    } else {
-      isExecutionRequested = true;
-    }
-
-    taskGroupStateManager.onTaskGroupStateChanged(
-        TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private void setOutputCollector(final Task task, final TaskDataHandler 
dataHandler) {
+    final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+    final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-    taskGroupDag.topologicalDo(task -> {
-      final String physicalTaskId = getPhysicalTaskId(task.getId());
-      taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-      try {
-        if (task instanceof BoundedSourceTask) {
-          launchBoundedSourceTask((BoundedSourceTask) task);
-          taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-          LOG.info("{} Execution Complete!", taskGroupId);
-        } else if (task instanceof OperatorTask) {
-          launchOperatorTask((OperatorTask) task);
-          taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-          LOG.info("{} Execution Complete!", taskGroupId);
-        } else if (task instanceof MetricCollectionBarrierTask) {
-          launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) 
task);
-          taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.ON_HOLD, Optional.empty());
-          LOG.info("{} Execution Complete!", taskGroupId);
-        } else {
-          throw new UnsupportedOperationException(task.toString());
-        }
-      } catch (final BlockFetchException ex) {
-        taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-            
Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
-        LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
-            new Object[] {taskGroupId, ex.toString()});
-      } catch (final BlockWriteException ex2) {
-        taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-            
Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-        LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
-            new Object[] {taskGroupId, ex2.toString()});
-      } catch (final Exception e) {
-        taskGroupStateManager.onTaskStateChanged(
-            physicalTaskId, TaskState.State.FAILED_UNRECOVERABLE, 
Optional.empty());
-        throw new RuntimeException(e);
+    taskGroupDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+      if (outEdge.isSideInput()) {
+        outputCollector.setSideInputRuntimeEdge(outEdge);
+        outputCollector.setAsSideInputFor(physicalTaskId);
       }
     });
+
+    dataHandler.setOutputCollector(outputCollector);
   }
 
-  /**
-   * Processes a BoundedSourceTask.
-   *
-   * @param boundedSourceTask the bounded source task to execute
-   * @throws Exception occurred during input read.
-   */
-  private void launchBoundedSourceTask(final BoundedSourceTask 
boundedSourceTask) throws Exception {
-    final String physicalTaskId = getPhysicalTaskId(boundedSourceTask.getId());
-    final Map<String, Object> metric = new HashMap<>();
-    metricCollector.beginMeasurement(physicalTaskId, metric);
+  private boolean hasOutputWriter(final Task task) {
+    return !getTaskDataHandler(task).getOutputWriters().isEmpty();
+  }
 
-    final long readStartTime = System.currentTimeMillis();
-    final Readable readable = boundedSourceTask.getReadable();
-    final Iterable readData = readable.read();
-    final long readEndTime = System.currentTimeMillis();
-    metric.put("BoundedSourceReadTime(ms)", readEndTime - readStartTime);
+  private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
+    final String physicalTaskId = getPhysicalTaskId(task.getId());
+    logicalTaskIdPutOnHold = 
RuntimeIdGenerator.getLogicalTaskIdIdFromPhysicalTaskId(physicalTaskId);
+  }
 
+  private void writeAndCloseOutputWriters(final Task task) {
+    final String physicalTaskId = getPhysicalTaskId(task.getId());
     final List<Long> writtenBytesList = new ArrayList<>();
-    for (final OutputWriter outputWriter : 
physicalTaskIdToOutputWriterMap.get(physicalTaskId)) {
-      outputWriter.write(readData);
+    final Map<String, Object> metric = new HashMap<>();
+    metricCollector.beginMeasurement(physicalTaskId, metric);
+    final long writeStartTime = System.currentTimeMillis();
+
+    getTaskDataHandler(task).getOutputWriters().forEach(outputWriter -> {
+      outputWriter.write();
       outputWriter.close();
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
-    }
+    });
+
     final long writeEndTime = System.currentTimeMillis();
-    metric.put("OutputWriteTime(ms)", writeEndTime - readEndTime);
+    metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
     putWrittenBytesMetric(writtenBytesList, metric);
     metricCollector.endMeasurement(physicalTaskId, metric);
   }
 
-  /**
-   * Processes an OperatorTask.
-   * @param operatorTask to execute
-   */
-  private void launchOperatorTask(final OperatorTask operatorTask) {
-    final Map<Transform, Object> sideInputMap = new HashMap<>();
-    final List<DataUtil.IteratorWithNumBytes> sideInputIterators = new 
ArrayList<>();
-    final String physicalTaskId = getPhysicalTaskId(operatorTask.getId());
+  private void prepareInputFromSource() {
+    taskGroupDag.topologicalDo(task -> {
+      if (task instanceof BoundedSourceTask) {
+        try {
+          final String iteratorId = generateIteratorId();
+          final Iterator iterator = ((BoundedSourceTask) 
task).getReadable().read().iterator();
+          idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
+          srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new 
ArrayList<>());
+          
srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
+        } catch (final BlockFetchException ex) {
+          
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
+              Optional.empty(), 
Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
+          LOG.info("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}",
+              taskGroupId, ex.toString());
+        } catch (final Exception e) {
+          
taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
+              Optional.empty(), Optional.empty());
+          LOG.info("{} Execution Failed! Exception: {}", taskGroupId, 
e.toString());
 
 Review comment:
   Please check this comment again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to