shpark closed pull request #76: [NEMO-151] Add OutputWriters for additional
tagged outputs
URL: https://github.com/apache/incubator-nemo/pull/76
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
index 1abe09278..661f151d8 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SailfishRelayReshapingPass.java
@@ -23,10 +23,12 @@
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import
edu.snu.nemo.common.ir.vertex.executionproperty.AdditionalTagOutputProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.SkipSerDesProperty;
import edu.snu.nemo.common.ir.vertex.transform.RelayTransform;
import java.util.Collections;
+import java.util.HashMap;
/**
* Pass to modify the DAG for a job to batch the disk seek.
@@ -60,6 +62,21 @@ public SailfishRelayReshapingPass() {
// before the vertex receiving shuffled data.
final OperatorVertex iFileMergerVertex = new OperatorVertex(new
RelayTransform());
iFileMergerVertex.getExecutionProperties().put(SkipSerDesProperty.of());
+
+ // Update additional tagged output property of the source vertex
if necessary.
+ if
(edge.getSrc().getPropertyValue(AdditionalTagOutputProperty.class).isPresent())
{
+ final HashMap<String, String> additionalTagOutputPropertyValue =
+
edge.getSrc().getPropertyValue(AdditionalTagOutputProperty.class).get();
+ if (additionalTagOutputPropertyValue.containsValue(v.getId())) {
+ additionalTagOutputPropertyValue.forEach((tag, vertexId) -> {
+ if (vertexId == v.getId()) {
+ additionalTagOutputPropertyValue.replace(tag,
iFileMergerVertex.getId());
+ }
+ });
+
edge.getSrc().setProperty(AdditionalTagOutputProperty.of(additionalTagOutputPropertyValue));
+ }
+ }
+
builder.addVertex(iFileMergerVertex);
final IREdge newEdgeToMerger = new
IREdge(DataCommunicationPatternProperty.Value.Shuffle,
edge.getSrc(), iFileMergerVertex, edge.isSideInput());
diff --git
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
index 5613e3ca6..d94797a49 100644
---
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
+++
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
@@ -20,10 +20,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import java.util.Arrays;
@@ -52,11 +49,11 @@ public static void main(final String[] args) {
// {} here is required for preserving type information.
// Please see https://stackoverflow.com/a/48431397 for details.
- final TupleTag<String> shortWordsTag = new TupleTag<String>() {
+ final TupleTag<KV<Integer, String>> shortWordsTag = new
TupleTag<KV<Integer, String>>("short") {
};
- final TupleTag<Integer> longWordsTag = new TupleTag<Integer>() {
+ final TupleTag<KV<Integer, String>> longWordsTag = new
TupleTag<KV<Integer, String>>("long") {
};
- final TupleTag<String> veryLongWordsTag = new TupleTag<String>() {
+ final TupleTag<String> veryLongWordsTag = new TupleTag<String>("very
long") {
};
final Pipeline p = Pipeline.create(options);
@@ -70,27 +67,44 @@ public static void main(final String[] args) {
@ProcessElement
public void processElement(final ProcessContext c) {
String word = c.element();
- if (word.length() < 5) {
- c.output(shortWordsTag, word);
- } else if (word.length() < 8) {
- c.output(longWordsTag, word.length());
+ if (word.length() < 6) {
+ c.output(shortWordsTag, KV.of(word.length(), word));
+ } else if (word.length() < 11) {
+ c.output(longWordsTag, KV.of(word.length(), word));
} else {
- c.output(veryLongWordsTag, word);
+ c.output(word);
}
}
}).withOutputTags(veryLongWordsTag, TupleTagList
- .of(longWordsTag)
- .and(shortWordsTag)));
+ .of(shortWordsTag).and(longWordsTag)));
- PCollection<String> shortWords = results.get(shortWordsTag);
- PCollection<String> longWordLengths = results
- .get(longWordsTag)
- .apply(MapElements.into(TypeDescriptors.strings()).via(i ->
Integer.toString(i)));
+ PCollection<String> shortWords = results.get(shortWordsTag)
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(new FormatLines()));
+ PCollection<String> longWords = results.get(longWordsTag)
+ .apply(GroupByKey.create())
+ .apply(MapElements.via(new FormatLines()));
PCollection<String> veryLongWords = results.get(veryLongWordsTag);
GenericSourceSink.write(shortWords, outputFilePath + "_short");
- GenericSourceSink.write(longWordLengths, outputFilePath + "_long");
+ GenericSourceSink.write(longWords, outputFilePath + "_long");
GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long");
p.run();
}
+
+ /**
+ * Formats a key-value pair to a string.
+ */
+ static class FormatLines extends SimpleFunction<KV<Integer,
Iterable<String>>, String> {
+ @Override
+ public String apply(final KV<Integer, Iterable<String>> input) {
+ final int length = input.getKey();
+ final StringBuilder sb = new StringBuilder();
+ for (final String word : input.getValue()) {
+ sb.append(length).append(": ").append(word).append('\n');
+ }
+
+ return sb.toString();
+ }
+ }
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
index 31c40ef90..f19164ba5 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
@@ -19,6 +19,8 @@
import edu.snu.nemo.common.test.ArgBuilder;
import edu.snu.nemo.common.test.ExampleTestUtil;
import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.SailfishPolicyParallelismFive;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,9 +48,9 @@
@Before
public void setUp() throws Exception {
builder = new ArgBuilder()
- .addResourceJson(executorResourceFileName)
- .addUserMain(PartitionWordsByLength.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath);
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(PartitionWordsByLength.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath);
}
@After
@@ -65,8 +67,16 @@ public void tearDown() throws Exception {
@Test (timeout = TIMEOUT)
public void test() throws Exception {
JobLauncher.main(builder
- .addJobId(PartitionWordsByLength.class.getSimpleName())
-
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
- .build());
+ .addJobId(PartitionWordsByLengthITCase.class.getSimpleName())
+
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
+ @Test (timeout = TIMEOUT)
+ public void testSailfish() throws Exception {
+ JobLauncher.main(builder
+ .addJobId(PartitionWordsByLengthITCase.class.getSimpleName() +
"_sailfish")
+
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+ .build());
}
}
diff --git a/examples/resources/sample_input_tag
b/examples/resources/sample_input_tag
index 0cd417beb..37ac499d3 100644
--- a/examples/resources/sample_input_tag
+++ b/examples/resources/sample_input_tag
@@ -1,7 +1,18 @@
+a
+to
foo
bar
+that
+ipsum
+dolor
foobar
barbaz
+abcdefg
+fooipsum
foobarbaz
+loren
+this
+foobarbazqux
+bazquxfoobar
+qux
ipsumlorem
-qux
\ No newline at end of file
diff --git a/examples/resources/test_output_tag_long
b/examples/resources/test_output_tag_long
index 91dea2c76..78c88bfda 100644
--- a/examples/resources/test_output_tag_long
+++ b/examples/resources/test_output_tag_long
@@ -1,2 +1,11 @@
-6
-6
+6: foobar
+6: barbaz
+
+7: abcdefg
+
+8: fooipsum
+
+9: foobarbaz
+
+10: ipsumlorem
+
diff --git a/examples/resources/test_output_tag_short
b/examples/resources/test_output_tag_short
index 72594ed96..d9d497b49 100644
--- a/examples/resources/test_output_tag_short
+++ b/examples/resources/test_output_tag_short
@@ -1,3 +1,15 @@
-foo
-bar
-qux
+1: a
+
+2: to
+
+3: foo
+3: bar
+3: qux
+
+4: that
+4: this
+
+5: ipsum
+5: dolor
+5: loren
+
diff --git a/examples/resources/test_output_tag_very_long
b/examples/resources/test_output_tag_very_long
index 22a28156a..f562b43e6 100644
--- a/examples/resources/test_output_tag_very_long
+++ b/examples/resources/test_output_tag_very_long
@@ -1,2 +1,2 @@
-foobarbaz
-ipsumlorem
+foobarbazqux
+bazquxfoobar
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 75207302d..f9f1870ae 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -162,13 +162,18 @@ public TaskExecutor(final Task task,
.collect(Collectors.toList());
// Handle writes
- final List<OutputWriter> childrenTaskWriters = getChildrenTaskWriters(
- taskIndex, irVertex, task.getTaskOutgoingEdges(),
dataTransferFactory); // Children-task write
+ // Main output children task writes
+ final List<OutputWriter> mainChildrenTaskWriters =
getMainChildrenTaskWriters(
+ taskIndex, irVertex, task.getTaskOutgoingEdges(),
dataTransferFactory, additionalOutputMap);
+ // Additional output children task writes
+ final Map<String, OutputWriter> additionalChildrenTaskWriters =
getAdditionalChildrenTaskWriters(
+ taskIndex, irVertex, task.getTaskOutgoingEdges(),
dataTransferFactory, additionalOutputMap);
final List<String> additionalOutputVertices = new
ArrayList<>(additionalOutputMap.values());
final OutputCollectorImpl oci = new
OutputCollectorImpl(additionalOutputVertices);
+ // intra-vertex writes
final VertexHarness vertexHarness = new VertexHarness(irVertex, oci,
children,
- isToSideInputs, isToAdditionalTagOutputs,
- childrenTaskWriters, new ContextImpl(sideInputMap,
additionalOutputMap)); // Intra-vertex write
+ isToSideInputs, isToAdditionalTagOutputs, mainChildrenTaskWriters,
additionalChildrenTaskWriters,
+ new ContextImpl(sideInputMap, additionalOutputMap));
prepareTransform(vertexHarness);
vertexIdToHarness.put(irVertex.getId(), vertexHarness);
@@ -222,11 +227,13 @@ private void processElementRecursively(final
VertexHarness vertexHarness, final
}
// Recursively process all of the additional output elements.
- vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
- while (!outputCollector.isEmpty(tag)) {
- final Object element = outputCollector.remove(tag);
- handleAdditionalOutputElement(vertexHarness, element, tag); //
Recursion
- }
+
vertexHarness.getIRVertex().getPropertyValue(AdditionalTagOutputProperty.class).ifPresent(tagToVertex
-> {
+ tagToVertex.values().forEach(dstVertexId -> {
+ while (!outputCollector.isEmpty(dstVertexId)) {
+ final Object element = outputCollector.remove(dstVertexId);
+ handleAdditionalOutputElement(vertexHarness, element, dstVertexId);
// Recursion
+ }
+ });
});
}
@@ -317,28 +324,49 @@ private void doExecute() {
private void finalizeVertex(final VertexHarness vertexHarness) {
closeTransform(vertexHarness);
- while (!vertexHarness.getOutputCollector().isEmpty()) {
- final Object element = vertexHarness.getOutputCollector().remove();
+ final OutputCollectorImpl outputCollector =
vertexHarness.getOutputCollector();
+
+ // handle main outputs
+ while (!outputCollector.isEmpty()) {
+ final Object element = outputCollector.remove();
handleMainOutputElement(vertexHarness, element);
}
+
+ // handle additional tagged outputs
+ vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
+ while (!outputCollector.isEmpty(tag)) {
+ final Object element = outputCollector.remove(tag);
+ handleAdditionalOutputElement(vertexHarness, element, tag);
+ }
+ });
finalizeOutputWriters(vertexHarness);
}
private void handleMainOutputElement(final VertexHarness harness, final
Object element) {
- harness.getWritersToChildrenTasks().forEach(outputWriter -> {
+ // writes to children tasks
+ harness.getWritersToMainChildrenTasks().forEach(outputWriter -> {
outputWriter.write(element);
});
+ // writes to side input children tasks
if (harness.getSideInputChildren().size() > 0) {
sideInputMap.put(((OperatorVertex)
harness.getIRVertex()).getTransform().getTag(), element);
}
+ // process elements in the next vertices within a task
harness.getNonSideInputChildren().forEach(child ->
processElementRecursively(child, element));
}
private void handleAdditionalOutputElement(final VertexHarness harness,
final Object element, final String tag) {
- // Inter-task writes are currently not supported.
+ // writes to additional children tasks
+ harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
+ .filter(kv -> kv.getKey().equals(tag))
+ .forEach(kv -> {
+ kv.getValue().write(element);
+ });
+ // writes to side input children tasks
if (harness.getSideInputChildren().size() > 0) {
sideInputMap.put(((OperatorVertex)
harness.getIRVertex()).getTransform().getTag(), element);
}
+ // process elements in the next vertices within a task
harness.getAdditionalTagOutputChildren().entrySet().stream()
.filter(kv -> kv.getKey().equals(tag))
.forEach(kv -> processElementRecursively(kv.getValue(), element));
@@ -354,7 +382,7 @@ private boolean handleDataFetchers(final List<DataFetcher>
fetchers) {
// For this looping of available fetchers.
int finishedFetcherIndex = NONE_FINISHED;
for (int i = 0; i < availableFetchers.size(); i++) {
- final DataFetcher dataFetcher = fetchers.get(i);
+ final DataFetcher dataFetcher = availableFetchers.get(i);
final Object element;
try {
element = dataFetcher.fetchDataElement();
@@ -418,18 +446,58 @@ private boolean handleDataFetchers(final
List<DataFetcher> fetchers) {
.collect(Collectors.toList());
}
- private List<OutputWriter> getChildrenTaskWriters(final int taskIndex,
- final IRVertex irVertex,
- final List<StageEdge>
outEdgesToChildrenTasks,
- final DataTransferFactory
dataTransferFactory) {
+ /**
+ * Return inter-task OutputWriters, for single output or output associated
with main tag.
+ * @param taskIndex current task index
+ * @param irVertex source irVertex
+ * @param outEdgesToChildrenTasks outgoing edges to child tasks
+ * @param dataTransferFactory dataTransferFactory
+ * @param taggedOutputs tag to vertex id map
+ * @return OutputWriters for main children tasks
+ */
+ private List<OutputWriter> getMainChildrenTaskWriters(final int taskIndex,
+ final IRVertex
irVertex,
+ final List<StageEdge>
outEdgesToChildrenTasks,
+ final
DataTransferFactory dataTransferFactory,
+ final Map<String,
String> taggedOutputs) {
return outEdgesToChildrenTasks
.stream()
.filter(outEdge ->
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
+ .filter(outEdge ->
!taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
.map(outEdgeForThisVertex -> dataTransferFactory
.createWriter(irVertex, taskIndex,
outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
.collect(Collectors.toList());
}
+ /**
+ * Return inter-task OutputWriters associated with additional output tags.
+ * @param taskIndex current task index
+ * @param irVertex source irVertex
+ * @param outEdgesToChildrenTasks outgoing edges to child tasks
+ * @param dataTransferFactory dataTransferFactory
+ * @param taggedOutputs tag to vertex id map
+ * @return additional children vertex id to OutputWriters map.
+ */
+ private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final int
taskIndex,
+ final
IRVertex irVertex,
+ final
List<StageEdge> outEdgesToChildrenTasks,
+ final
DataTransferFactory dataTransferFactory,
+ final
Map<String, String> taggedOutputs) {
+ final Map<String, OutputWriter> additionalChildrenTaskWriters = new
HashMap<>();
+
+ outEdgesToChildrenTasks
+ .stream()
+ .filter(outEdge ->
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
+ .filter(outEdge ->
taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+ .forEach(outEdgeForThisVertex -> {
+
additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(),
+ dataTransferFactory.createWriter(irVertex, taskIndex,
outEdgeForThisVertex.getDstIRVertex(),
+ outEdgeForThisVertex));
+ });
+
+ return additionalChildrenTaskWriters;
+ }
+
private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex,
final DAG<IRVertex,
RuntimeEdge<IRVertex>> irVertexDag,
final Map<String,
VertexHarness> vertexIdToHarness) {
@@ -486,12 +554,20 @@ private void setIRVertexPutOnHold(final
MetricCollectionBarrierVertex irVertex)
private void finalizeOutputWriters(final VertexHarness vertexHarness) {
final List<Long> writtenBytesList = new ArrayList<>();
- vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> {
+ // finalize OutputWriters for main children
+ vertexHarness.getWritersToMainChildrenTasks().forEach(outputWriter -> {
outputWriter.close();
final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
writtenBytes.ifPresent(writtenBytesList::add);
});
+ // finalize OutputWriters for additional tagged children
+
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter
-> {
+ outputWriter.close();
+ final Optional<Long> writtennBytes = outputWriter.getWrittenBytes();
+ writtennBytes.ifPresent(writtenBytesList::add);
+ });
+
long totalWrittenBytes = 0;
for (final Long writtenBytes : writtenBytesList) {
totalWrittenBytes += writtenBytes;
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
index c5f9a7850..c79b53020 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
@@ -38,14 +38,16 @@
private final List<VertexHarness> sideInputChildren;
private final List<VertexHarness> nonSideInputChildren;
private final Map<String, VertexHarness> additionalTagOutputChildren;
- private final List<OutputWriter> writersToChildrenTasks;
+ private final List<OutputWriter> writersToMainChildrenTasks;
+ private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
VertexHarness(final IRVertex irVertex,
final OutputCollectorImpl outputCollector,
final List<VertexHarness> children,
final List<Boolean> isSideInputs,
final List<Boolean> isAdditionalTagOutputs,
- final List<OutputWriter> writersToChildrenTasks,
+ final List<OutputWriter> writersToMainChildrenTasks,
+ final Map<String, OutputWriter>
writersToAdditionalChildrenTasks,
final Transform.Context context) {
this.irVertex = irVertex;
this.outputCollector = outputCollector;
@@ -71,7 +73,8 @@
this.sideInputChildren = sides;
this.nonSideInputChildren = nonSides;
this.additionalTagOutputChildren = tagged;
- this.writersToChildrenTasks = writersToChildrenTasks;
+ this.writersToMainChildrenTasks = writersToMainChildrenTasks;
+ this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks;
this.context = context;
}
@@ -111,10 +114,17 @@ OutputCollectorImpl getOutputCollector() {
}
/**
- * @return OutputWriters of this irVertex. (empty if none exists)
+ * @return OutputWriters for main outputs of this irVertex. (empty if none
exists)
*/
- List<OutputWriter> getWritersToChildrenTasks() {
- return writersToChildrenTasks;
+ List<OutputWriter> getWritersToMainChildrenTasks() {
+ return writersToMainChildrenTasks;
+ }
+
+ /**
+ * @return OutputWriters for additional tagged outputs of this irVertex.
(empty if none exists)
+ */
+ Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() {
+ return writersToAdditionalChildrenTasks;
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services