shpark closed pull request #75: [NEMO-151] Add OutputWriters for additional tagged outputs URL: https://github.com/apache/incubator-nemo/pull/75
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java index 5613e3ca6..d94797a49 100644 --- a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java +++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java @@ -20,10 +20,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.FlatMapElements; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.*; import java.util.Arrays; @@ -52,11 +49,11 @@ public static void main(final String[] args) { // {} here is required for preserving type information. // Please see https://stackoverflow.com/a/48431397 for details. - final TupleTag<String> shortWordsTag = new TupleTag<String>() { + final TupleTag<KV<Integer, String>> shortWordsTag = new TupleTag<KV<Integer, String>>("short") { }; - final TupleTag<Integer> longWordsTag = new TupleTag<Integer>() { + final TupleTag<KV<Integer, String>> longWordsTag = new TupleTag<KV<Integer, String>>("long") { }; - final TupleTag<String> veryLongWordsTag = new TupleTag<String>() { + final TupleTag<String> veryLongWordsTag = new TupleTag<String>("very long") { }; final Pipeline p = Pipeline.create(options); @@ -70,27 +67,44 @@ public static void main(final String[] args) { @ProcessElement public void processElement(final ProcessContext c) { String word = c.element(); - if (word.length() < 5) { - c.output(shortWordsTag, word); - } else if (word.length() < 8) { - c.output(longWordsTag, word.length()); + if (word.length() < 6) { + c.output(shortWordsTag, KV.of(word.length(), word)); + } else if (word.length() < 11) { + c.output(longWordsTag, KV.of(word.length(), word)); } else { - c.output(veryLongWordsTag, word); + c.output(word); } } }).withOutputTags(veryLongWordsTag, TupleTagList - .of(longWordsTag) - .and(shortWordsTag))); + .of(shortWordsTag).and(longWordsTag))); - PCollection<String> shortWords = results.get(shortWordsTag); - PCollection<String> longWordLengths = results - .get(longWordsTag) - .apply(MapElements.into(TypeDescriptors.strings()).via(i -> Integer.toString(i))); + PCollection<String> shortWords = results.get(shortWordsTag) + .apply(GroupByKey.create()) + .apply(MapElements.via(new FormatLines())); + PCollection<String> longWords = results.get(longWordsTag) + .apply(GroupByKey.create()) + .apply(MapElements.via(new FormatLines())); PCollection<String> veryLongWords = results.get(veryLongWordsTag); GenericSourceSink.write(shortWords, outputFilePath + "_short"); - GenericSourceSink.write(longWordLengths, outputFilePath + "_long"); + GenericSourceSink.write(longWords, outputFilePath + "_long"); GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long"); p.run(); } + + /** + * Formats a key-value pair to a string. + */ + static class FormatLines extends SimpleFunction<KV<Integer, Iterable<String>>, String> { + @Override + public String apply(final KV<Integer, Iterable<String>> input) { + final int length = input.getKey(); + final StringBuilder sb = new StringBuilder(); + for (final String word : input.getValue()) { + sb.append(length).append(": ").append(word).append('\n'); + } + + return sb.toString(); + } + } } diff --git a/examples/resources/sample_input_tag b/examples/resources/sample_input_tag index 0cd417beb..37ac499d3 100644 --- a/examples/resources/sample_input_tag +++ b/examples/resources/sample_input_tag @@ -1,7 +1,18 @@ +a +to foo bar +that +ipsum +dolor foobar barbaz +abcdefg +fooipsum foobarbaz +loren +this +foobarbazqux +bazquxfoobar +qux ipsumlorem -qux \ No newline at end of file diff --git a/examples/resources/test_output_tag_long b/examples/resources/test_output_tag_long index 91dea2c76..78c88bfda 100644 --- a/examples/resources/test_output_tag_long +++ b/examples/resources/test_output_tag_long @@ -1,2 +1,11 @@ -6 -6 +6: foobar +6: barbaz + +7: abcdefg + +8: fooipsum + +9: foobarbaz + +10: ipsumlorem + diff --git a/examples/resources/test_output_tag_short b/examples/resources/test_output_tag_short index 72594ed96..d9d497b49 100644 --- a/examples/resources/test_output_tag_short +++ b/examples/resources/test_output_tag_short @@ -1,3 +1,15 @@ -foo -bar -qux +1: a + +2: to + +3: foo +3: bar +3: qux + +4: that +4: this + +5: ipsum +5: dolor +5: loren + diff --git a/examples/resources/test_output_tag_very_long b/examples/resources/test_output_tag_very_long index 22a28156a..f562b43e6 100644 --- a/examples/resources/test_output_tag_very_long +++ b/examples/resources/test_output_tag_very_long @@ -1,2 +1,2 @@ -foobarbaz -ipsumlorem +foobarbazqux +bazquxfoobar diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java index bfcd04096..7944bb9e9 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java @@ -162,13 +162,18 @@ public TaskExecutor(final Task task, .collect(Collectors.toList()); // Handle writes - final List<OutputWriter> childrenTaskWriters = getChildrenTaskWriters( - taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory); // Children-task write + // Main output children task writes + final List<OutputWriter> mainChildrenTaskWriters = getMainChildrenTaskWriters( + taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap); + // Additional output children task writes + final Map<String, OutputWriter> additionalChildrenTaskWriters = getAdditionalChildrenTaskWriters( + taskIndex, irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap); final List<String> additionalOutputVertices = new ArrayList<>(additionalOutputMap.values()); final OutputCollectorImpl oci = new OutputCollectorImpl(additionalOutputVertices); + // intra-vertex writes final VertexHarness vertexHarness = new VertexHarness(irVertex, oci, children, - isToSideInputs, isToAdditionalTagOutputs, - childrenTaskWriters, new ContextImpl(sideInputMap, additionalOutputMap)); // Intra-vertex write + isToSideInputs, isToAdditionalTagOutputs, mainChildrenTaskWriters, additionalChildrenTaskWriters, + new ContextImpl(sideInputMap, additionalOutputMap)); prepareTransform(vertexHarness); vertexIdToHarness.put(irVertex.getId(), vertexHarness); @@ -222,11 +227,13 @@ private void processElementRecursively(final VertexHarness vertexHarness, final } // Recursively process all of the additional output elements. - vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> { - while (!outputCollector.isEmpty(tag)) { - final Object element = outputCollector.remove(tag); - handleAdditionalOutputElement(vertexHarness, element, tag); // Recursion - } + vertexHarness.getIRVertex().getPropertyValue(AdditionalTagOutputProperty.class).ifPresent(tagToVertex -> { + tagToVertex.values().forEach(tag -> { + while (!outputCollector.isEmpty(tag)) { + final Object element = outputCollector.remove(tag); + handleAdditionalOutputElement(vertexHarness, element, tag); // Recursion + } + }); }); } @@ -317,28 +324,49 @@ private void doExecute() { private void finalizeVertex(final VertexHarness vertexHarness) { closeTransform(vertexHarness); - while (!vertexHarness.getOutputCollector().isEmpty()) { - final Object element = vertexHarness.getOutputCollector().remove(); + final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector(); + + // handle main outputs + while (!outputCollector.isEmpty()) { + final Object element = outputCollector.remove(); handleMainOutputElement(vertexHarness, element); } + + // handle additional tagged outputs + vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> { + while (!outputCollector.isEmpty(tag)) { + final Object element = outputCollector.remove(tag); + handleAdditionalOutputElement(vertexHarness, element, tag); + } + }); finalizeOutputWriters(vertexHarness); } private void handleMainOutputElement(final VertexHarness harness, final Object element) { - harness.getWritersToChildrenTasks().forEach(outputWriter -> { + // writes to children tasks + harness.getWritersToChildrenTasks().forEach(outputWriter -> { outputWriter.write(element); }); + // writes to side input children tasks if (harness.getSideInputChildren().size() > 0) { sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element); } + // process elements in the next vertices within a task harness.getNonSideInputChildren().forEach(child -> processElementRecursively(child, element)); } private void handleAdditionalOutputElement(final VertexHarness harness, final Object element, final String tag) { - // Inter-task writes are currently not supported. + // writes to additional children tasks + harness.getWritersToAdditionalChildrenTasks().entrySet().stream() + .filter(kv -> kv.getKey().equals(tag)) + .forEach(kv -> { + kv.getValue().write(element); + }); + // writes to side input children tasks if (harness.getSideInputChildren().size() > 0) { sideInputMap.put(((OperatorVertex) harness.getIRVertex()).getTransform().getTag(), element); } + // process elements in the next vertices within a task harness.getAdditionalTagOutputChildren().entrySet().stream() .filter(kv -> kv.getKey().equals(tag)) .forEach(kv -> processElementRecursively(kv.getValue(), element)); @@ -350,8 +378,9 @@ private void handleAdditionalOutputElement(final VertexHarness harness, final Ob */ private boolean handleDataFetchers(final List<DataFetcher> fetchers) { final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers); - int finishedFetcherIndex = NONE_FINISHED; + int finishedFetcherIndex; while (!availableFetchers.isEmpty()) { // empty means we've consumed all task-external input data + finishedFetcherIndex = NONE_FINISHED; for (int i = 0; i < availableFetchers.size(); i++) { final DataFetcher dataFetcher = fetchers.get(i); final Object element; @@ -417,18 +446,40 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) { .collect(Collectors.toList()); } - private List<OutputWriter> getChildrenTaskWriters(final int taskIndex, - final IRVertex irVertex, - final List<StageEdge> outEdgesToChildrenTasks, - final DataTransferFactory dataTransferFactory) { + private List<OutputWriter> getMainChildrenTaskWriters(final int taskIndex, + final IRVertex irVertex, + final List<StageEdge> outEdgesToChildrenTasks, + final DataTransferFactory dataTransferFactory, + final Map<String, String> taggedOutputs) { return outEdgesToChildrenTasks .stream() .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId())) + .filter(outEdge -> !taggedOutputs.containsValue(outEdge.getDstIRVertex().getId())) .map(outEdgeForThisVertex -> dataTransferFactory .createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex)) .collect(Collectors.toList()); } + private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final int taskIndex, + final IRVertex irVertex, + final List<StageEdge> outEdgesChildrenTasks, + final DataTransferFactory dataTransferFactory, + final Map<String, String> taggedOutputs) { + final Map<String, OutputWriter> additionalChildrenTaskWriters = new HashMap<>(); + + outEdgesChildrenTasks + .stream() + .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId())) + .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId())) + .forEach(outEdgeForThisVertex -> { + additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(), + dataTransferFactory.createWriter(irVertex, taskIndex, outEdgeForThisVertex.getDstIRVertex(), + outEdgeForThisVertex)); + }); + + return additionalChildrenTaskWriters; + } + private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex, final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag, final Map<String, VertexHarness> vertexIdToHarness) { @@ -485,12 +536,20 @@ private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex) private void finalizeOutputWriters(final VertexHarness vertexHarness) { final List<Long> writtenBytesList = new ArrayList<>(); - vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> { + // finalize OutputWriters for main children + vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> { outputWriter.close(); final Optional<Long> writtenBytes = outputWriter.getWrittenBytes(); writtenBytes.ifPresent(writtenBytesList::add); }); + // finalize OutputWriters for additional tagged children + vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter -> { + outputWriter.close(); + final Optional<Long> writtennBytes = outputWriter.getWrittenBytes(); + writtennBytes.ifPresent(writtenBytesList::add); + }); + long totalWrittenBytes = 0; for (final Long writtenBytes : writtenBytesList) { totalWrittenBytes += writtenBytes; diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java index c5f9a7850..c79b53020 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java @@ -38,14 +38,16 @@ private final List<VertexHarness> sideInputChildren; private final List<VertexHarness> nonSideInputChildren; private final Map<String, VertexHarness> additionalTagOutputChildren; - private final List<OutputWriter> writersToChildrenTasks; + private final List<OutputWriter> writersToMainChildrenTasks; + private final Map<String, OutputWriter> writersToAdditionalChildrenTasks; VertexHarness(final IRVertex irVertex, final OutputCollectorImpl outputCollector, final List<VertexHarness> children, final List<Boolean> isSideInputs, final List<Boolean> isAdditionalTagOutputs, - final List<OutputWriter> writersToChildrenTasks, + final List<OutputWriter> writersToMainChildrenTasks, + final Map<String, OutputWriter> writersToAdditionalChildrenTasks, final Transform.Context context) { this.irVertex = irVertex; this.outputCollector = outputCollector; @@ -71,7 +73,8 @@ this.sideInputChildren = sides; this.nonSideInputChildren = nonSides; this.additionalTagOutputChildren = tagged; - this.writersToChildrenTasks = writersToChildrenTasks; + this.writersToMainChildrenTasks = writersToMainChildrenTasks; + this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks; this.context = context; } @@ -111,10 +114,17 @@ OutputCollectorImpl getOutputCollector() { } /** - * @return OutputWriters of this irVertex. (empty if none exists) + * @return OutputWriters for main outputs of this irVertex. (empty if none exists) */ - List<OutputWriter> getWritersToChildrenTasks() { - return writersToChildrenTasks; + List<OutputWriter> getWritersToMainChildrenTasks() { + return writersToMainChildrenTasks; + } + + /** + * @return OutputWriters for additional tagged outputs of this irVertex. (empty if none exists) + */ + Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() { + return writersToAdditionalChildrenTasks; } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services