shpark closed pull request #75: [NEMO-151] Add OutputWriters for additional 
tagged outputs
URL: https://github.com/apache/incubator-nemo/pull/75
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
index 5613e3ca6..d94797a49 100644
--- 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
+++ 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
@@ -20,10 +20,7 @@
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.*;
 
 import java.util.Arrays;
@@ -52,11 +49,11 @@ public static void main(final String[] args) {
 
     // {} here is required for preserving type information.
     // Please see https://stackoverflow.com/a/48431397 for details.
-    final TupleTag<String> shortWordsTag = new TupleTag<String>() {
+    final TupleTag<KV<Integer, String>> shortWordsTag = new 
TupleTag<KV<Integer, String>>("short") {
     };
-    final TupleTag<Integer> longWordsTag = new TupleTag<Integer>() {
+    final TupleTag<KV<Integer, String>> longWordsTag = new 
TupleTag<KV<Integer, String>>("long") {
     };
-    final TupleTag<String> veryLongWordsTag = new TupleTag<String>() {
+    final TupleTag<String> veryLongWordsTag = new TupleTag<String>("very 
long") {
     };
 
     final Pipeline p = Pipeline.create(options);
@@ -70,27 +67,44 @@ public static void main(final String[] args) {
           @ProcessElement
           public void processElement(final ProcessContext c) {
             String word = c.element();
-            if (word.length() < 5) {
-              c.output(shortWordsTag, word);
-            } else if (word.length() < 8) {
-              c.output(longWordsTag, word.length());
+            if (word.length() < 6) {
+              c.output(shortWordsTag, KV.of(word.length(), word));
+            } else if (word.length() < 11) {
+              c.output(longWordsTag, KV.of(word.length(), word));
             } else {
-              c.output(veryLongWordsTag, word);
+              c.output(word);
             }
           }
         }).withOutputTags(veryLongWordsTag, TupleTagList
-            .of(longWordsTag)
-            .and(shortWordsTag)));
+            .of(shortWordsTag).and(longWordsTag)));
 
-    PCollection<String> shortWords = results.get(shortWordsTag);
-    PCollection<String> longWordLengths = results
-        .get(longWordsTag)
-        .apply(MapElements.into(TypeDescriptors.strings()).via(i -> 
Integer.toString(i)));
+    PCollection<String> shortWords = results.get(shortWordsTag)
+        .apply(GroupByKey.create())
+        .apply(MapElements.via(new FormatLines()));
+    PCollection<String> longWords = results.get(longWordsTag)
+        .apply(GroupByKey.create())
+        .apply(MapElements.via(new FormatLines()));
     PCollection<String> veryLongWords = results.get(veryLongWordsTag);
 
     GenericSourceSink.write(shortWords, outputFilePath + "_short");
-    GenericSourceSink.write(longWordLengths, outputFilePath + "_long");
+    GenericSourceSink.write(longWords, outputFilePath + "_long");
     GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long");
     p.run();
   }
+
+  /**
+   * Formats a key-value pair to a string.
+   */
+  static class FormatLines extends SimpleFunction<KV<Integer, 
Iterable<String>>, String> {
+    @Override
+    public String apply(final KV<Integer, Iterable<String>> input) {
+      final int length = input.getKey();
+      final StringBuilder sb = new StringBuilder();
+      for (final String word : input.getValue()) {
+        sb.append(length).append(": ").append(word).append('\n');
+      }
+
+      return sb.toString();
+    }
+  }
 }
diff --git a/examples/resources/sample_input_tag 
b/examples/resources/sample_input_tag
index 0cd417beb..37ac499d3 100644
--- a/examples/resources/sample_input_tag
+++ b/examples/resources/sample_input_tag
@@ -1,7 +1,18 @@
+a
+to
 foo
 bar
+that
+ipsum
+dolor
 foobar
 barbaz
+abcdefg
+fooipsum
 foobarbaz
+loren
+this
+foobarbazqux
+bazquxfoobar
+qux
 ipsumlorem
-qux
\ No newline at end of file
diff --git a/examples/resources/test_output_tag_long 
b/examples/resources/test_output_tag_long
index 91dea2c76..78c88bfda 100644
--- a/examples/resources/test_output_tag_long
+++ b/examples/resources/test_output_tag_long
@@ -1,2 +1,11 @@
-6
-6
+6: foobar
+6: barbaz
+
+7: abcdefg
+
+8: fooipsum
+
+9: foobarbaz
+
+10: ipsumlorem
+
diff --git a/examples/resources/test_output_tag_short 
b/examples/resources/test_output_tag_short
index 72594ed96..d9d497b49 100644
--- a/examples/resources/test_output_tag_short
+++ b/examples/resources/test_output_tag_short
@@ -1,3 +1,15 @@
-foo
-bar
-qux
+1: a
+
+2: to
+
+3: foo
+3: bar
+3: qux
+
+4: that
+4: this
+
+5: ipsum
+5: dolor
+5: loren
+
diff --git a/examples/resources/test_output_tag_very_long 
b/examples/resources/test_output_tag_very_long
index 22a28156a..f562b43e6 100644
--- a/examples/resources/test_output_tag_very_long
+++ b/examples/resources/test_output_tag_very_long
@@ -1,2 +1,2 @@
-foobarbaz
-ipsumlorem
+foobarbazqux
+bazquxfoobar
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index bfcd04096..7944bb9e9 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -162,13 +162,18 @@ public TaskExecutor(final Task task,
           .collect(Collectors.toList());
 
       // Handle writes
-      final List<OutputWriter> childrenTaskWriters = getChildrenTaskWriters(
-          taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory); // Children-task write
+      // Main output children task writes
+      final List<OutputWriter> mainChildrenTaskWriters = 
getMainChildrenTaskWriters(
+          taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
+      // Additional output children task writes
+      final Map<String, OutputWriter> additionalChildrenTaskWriters = 
getAdditionalChildrenTaskWriters(
+          taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
       final List<String> additionalOutputVertices = new 
ArrayList<>(additionalOutputMap.values());
       final OutputCollectorImpl oci = new 
OutputCollectorImpl(additionalOutputVertices);
+      // intra-vertex writes
       final VertexHarness vertexHarness = new VertexHarness(irVertex, oci, 
children,
-          isToSideInputs, isToAdditionalTagOutputs,
-          childrenTaskWriters, new ContextImpl(sideInputMap, 
additionalOutputMap)); // Intra-vertex write
+          isToSideInputs, isToAdditionalTagOutputs, mainChildrenTaskWriters, 
additionalChildrenTaskWriters,
+          new ContextImpl(sideInputMap, additionalOutputMap));
       prepareTransform(vertexHarness);
       vertexIdToHarness.put(irVertex.getId(), vertexHarness);
 
@@ -222,11 +227,13 @@ private void processElementRecursively(final 
VertexHarness vertexHarness, final
     }
 
     // Recursively process all of the additional output elements.
-    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      while (!outputCollector.isEmpty(tag)) {
-        final Object element = outputCollector.remove(tag);
-        handleAdditionalOutputElement(vertexHarness, element, tag); // 
Recursion
-      }
+    
vertexHarness.getIRVertex().getPropertyValue(AdditionalTagOutputProperty.class).ifPresent(tagToVertex
 -> {
+      tagToVertex.values().forEach(tag -> {
+        while (!outputCollector.isEmpty(tag)) {
+          final Object element = outputCollector.remove(tag);
+          handleAdditionalOutputElement(vertexHarness, element, tag); // 
Recursion
+        }
+      });
     });
   }
 
@@ -317,28 +324,49 @@ private void doExecute() {
 
   private void finalizeVertex(final VertexHarness vertexHarness) {
     closeTransform(vertexHarness);
-    while (!vertexHarness.getOutputCollector().isEmpty()) {
-      final Object element = vertexHarness.getOutputCollector().remove();
+    final OutputCollectorImpl outputCollector = 
vertexHarness.getOutputCollector();
+
+    // handle main outputs
+    while (!outputCollector.isEmpty()) {
+      final Object element = outputCollector.remove();
       handleMainOutputElement(vertexHarness, element);
     }
+
+    // handle additional tagged outputs
+    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
+      while (!outputCollector.isEmpty(tag)) {
+        final Object element = outputCollector.remove(tag);
+        handleAdditionalOutputElement(vertexHarness, element, tag);
+      }
+    });
     finalizeOutputWriters(vertexHarness);
   }
 
   private void handleMainOutputElement(final VertexHarness harness, final 
Object element) {
-    harness.getWritersToChildrenTasks().forEach(outputWriter -> {
+    // writes to children tasks
+      harness.getWritersToChildrenTasks().forEach(outputWriter -> {
       outputWriter.write(element);
     });
+    // writes to side input children tasks
     if (harness.getSideInputChildren().size() > 0) {
       sideInputMap.put(((OperatorVertex) 
harness.getIRVertex()).getTransform().getTag(), element);
     }
+    // process elements in the next vertices within a task
     harness.getNonSideInputChildren().forEach(child -> 
processElementRecursively(child, element));
   }
 
   private void handleAdditionalOutputElement(final VertexHarness harness, 
final Object element, final String tag) {
-    // Inter-task writes are currently not supported.
+    // writes to additional children tasks
+    harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
+        .filter(kv -> kv.getKey().equals(tag))
+        .forEach(kv -> {
+          kv.getValue().write(element);
+        });
+    // writes to side input children tasks
     if (harness.getSideInputChildren().size() > 0) {
       sideInputMap.put(((OperatorVertex) 
harness.getIRVertex()).getTransform().getTag(), element);
     }
+    // process elements in the next vertices within a task
     harness.getAdditionalTagOutputChildren().entrySet().stream()
         .filter(kv -> kv.getKey().equals(tag))
         .forEach(kv -> processElementRecursively(kv.getValue(), element));
@@ -350,8 +378,9 @@ private void handleAdditionalOutputElement(final 
VertexHarness harness, final Ob
    */
   private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
     final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
-    int finishedFetcherIndex = NONE_FINISHED;
+    int finishedFetcherIndex;
     while (!availableFetchers.isEmpty()) { // empty means we've consumed all 
task-external input data
+      finishedFetcherIndex = NONE_FINISHED;
       for (int i = 0; i < availableFetchers.size(); i++) {
         final DataFetcher dataFetcher = fetchers.get(i);
         final Object element;
@@ -417,18 +446,40 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
         .collect(Collectors.toList());
   }
 
-  private List<OutputWriter> getChildrenTaskWriters(final int taskIndex,
-                                                    final IRVertex irVertex,
-                                                    final List<StageEdge> 
outEdgesToChildrenTasks,
-                                                    final DataTransferFactory 
dataTransferFactory) {
+  private List<OutputWriter> getMainChildrenTaskWriters(final int taskIndex,
+                                                        final IRVertex 
irVertex,
+                                                        final List<StageEdge> 
outEdgesToChildrenTasks,
+                                                        final 
DataTransferFactory dataTransferFactory,
+                                                        final Map<String, 
String> taggedOutputs) {
     return outEdgesToChildrenTasks
         .stream()
         .filter(outEdge -> 
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
+        .filter(outEdge -> 
!taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
         .map(outEdgeForThisVertex -> dataTransferFactory
             .createWriter(irVertex, taskIndex, 
outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
         .collect(Collectors.toList());
   }
 
+  private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final int 
taskIndex,
+                                                                     final 
IRVertex irVertex,
+                                                                     final 
List<StageEdge> outEdgesChildrenTasks,
+                                                                     final 
DataTransferFactory dataTransferFactory,
+                                                                     final 
Map<String, String> taggedOutputs) {
+    final Map<String, OutputWriter> additionalChildrenTaskWriters = new 
HashMap<>();
+
+    outEdgesChildrenTasks
+        .stream()
+        .filter(outEdge -> 
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
+        .filter(outEdge -> 
taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+        .forEach(outEdgeForThisVertex -> {
+          
additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(),
+              dataTransferFactory.createWriter(irVertex, taskIndex, 
outEdgeForThisVertex.getDstIRVertex(),
+                  outEdgeForThisVertex));
+        });
+
+    return additionalChildrenTaskWriters;
+  }
+
   private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex,
                                                    final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag,
                                                    final Map<String, 
VertexHarness> vertexIdToHarness) {
@@ -485,12 +536,20 @@ private void setIRVertexPutOnHold(final 
MetricCollectionBarrierVertex irVertex)
   private void finalizeOutputWriters(final VertexHarness vertexHarness) {
     final List<Long> writtenBytesList = new ArrayList<>();
 
-    vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> {
+    // finalize OutputWriters for main children
+    vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> {
       outputWriter.close();
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
     });
 
+    // finalize OutputWriters for additional tagged children
+    
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter
 -> {
+      outputWriter.close();
+      final Optional<Long> writtennBytes = outputWriter.getWrittenBytes();
+      writtennBytes.ifPresent(writtenBytesList::add);
+    });
+
     long totalWrittenBytes = 0;
     for (final Long writtenBytes : writtenBytesList) {
       totalWrittenBytes += writtenBytes;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
index c5f9a7850..c79b53020 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
@@ -38,14 +38,16 @@
   private final List<VertexHarness> sideInputChildren;
   private final List<VertexHarness> nonSideInputChildren;
   private final Map<String, VertexHarness> additionalTagOutputChildren;
-  private final List<OutputWriter> writersToChildrenTasks;
+  private final List<OutputWriter> writersToMainChildrenTasks;
+  private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
 
   VertexHarness(final IRVertex irVertex,
                 final OutputCollectorImpl outputCollector,
                 final List<VertexHarness> children,
                 final List<Boolean> isSideInputs,
                 final List<Boolean> isAdditionalTagOutputs,
-                final List<OutputWriter> writersToChildrenTasks,
+                final List<OutputWriter> writersToMainChildrenTasks,
+                final Map<String, OutputWriter> 
writersToAdditionalChildrenTasks,
                 final Transform.Context context) {
     this.irVertex = irVertex;
     this.outputCollector = outputCollector;
@@ -71,7 +73,8 @@
     this.sideInputChildren = sides;
     this.nonSideInputChildren = nonSides;
     this.additionalTagOutputChildren = tagged;
-    this.writersToChildrenTasks = writersToChildrenTasks;
+    this.writersToMainChildrenTasks = writersToMainChildrenTasks;
+    this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks;
     this.context = context;
   }
 
@@ -111,10 +114,17 @@ OutputCollectorImpl getOutputCollector() {
   }
 
   /**
-   * @return OutputWriters of this irVertex. (empty if none exists)
+   * @return OutputWriters for main outputs of this irVertex. (empty if none 
exists)
    */
-  List<OutputWriter> getWritersToChildrenTasks() {
-    return writersToChildrenTasks;
+  List<OutputWriter> getWritersToMainChildrenTasks() {
+    return writersToMainChildrenTasks;
+  }
+
+  /**
+   * @return OutputWriters for additional tagged outputs of this irVertex. 
(empty if none exists)
+   */
+  Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() {
+    return writersToAdditionalChildrenTasks;
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to