sanha commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r187042041
########## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java ########## @@ -46,11 +52,47 @@ public void emit(final String dstVertexId, final Object output) { } /** - * Collects the accumulated output and replace the output list. + * Inter-Task data is transferred from sender-side Task's OutputCollectorImpl + * to receiver-side Task. + * + * @return the first element of this list + */ + public O remove() { + return outputQueue.remove(); + } + + public boolean isEmpty() { + return outputQueue.isEmpty(); + } + + public int size() { + return outputQueue.size(); + } + + /** + * Mark this edge as side input so that TaskGroupExecutor can retrieve + * source transform using it. * - * @return the list of output elements. + * @param edge the RuntimeEdge to mark as side input. */ - public List<O> collectOutputList() { - return outputList.getAndSet(new ArrayList<>()); + public void setSideInputRuntimeEdge(final RuntimeEdge edge) { + sideInputRuntimeEdge = edge; + } + + public RuntimeEdge getSideInputRuntimeEdge() { + return sideInputRuntimeEdge; + } + + /** + * Set this OutputCollector as having side input for the given child task. + * + * @param physicalTaskId the id of child task whose side input will be put into this OutputCollector. + */ + public void setAsSideInputFor(final String physicalTaskId) { + sideInputReceivers.add(physicalTaskId); + } + Review comment: Please add a comment :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services