johnyangk closed pull request #90: [NEMO-81] Support the Beam 'Partition' transform URL: https://github.com/apache/incubator-nemo/pull/90
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java new file mode 100644 index 000000000..61c6b1ddc --- /dev/null +++ b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PerPercentileAverage.java @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.examples.beam; + +import com.google.common.collect.Lists; +import edu.snu.nemo.compiler.frontend.beam.NemoPipelineOptions; +import edu.snu.nemo.compiler.frontend.beam.NemoPipelineRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +import java.io.Serializable; +import java.util.List; + +/** + * Per percentile statistics application. + */ +public final class PerPercentileAverage { + /** + * Private Constructor. + */ + private PerPercentileAverage() { + } + + /** + * Main function for the MR BEAM program. + * + * @param args arguments. + */ + public static void main(final String[] args) { + final String inputFilePath = args[0]; + final String outputFilePath = args[1]; + final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); + options.setRunner(NemoPipelineRunner.class); + options.setJobName("PerPercentileAverage"); + + final Pipeline p = Pipeline.create(options); + + PCollection<Student> students = GenericSourceSink.read(p, inputFilePath) + .apply(ParDo.of(new DoFn<String, Student>() { + @ProcessElement + public void processElement(final ProcessContext c) { + String[] line = c.element().split(" "); + c.output(new Student(Integer.parseInt(line[0]), Integer.parseInt(line[1]), Integer.parseInt(line[2]))); + } + })) + .setCoder(SerializableCoder.of(Student.class)); + + PCollectionList<Student> studentsByPercentile = + // Make sure that each partition contain at least one element. + // If there are empty PCollections, successive WriteFiles may fail. + students.apply(Partition.of(10, new Partition.PartitionFn<Student>() { + public int partitionFor(final Student student, final int numPartitions) { + return student.getPercentile() / numPartitions; + } + })); + + PCollection<String> [] results = new PCollection[10]; + for (int i = 0; i < 10; i++) { + results[i] = studentsByPercentile.get(i) + .apply(MapElements.via(new SimpleFunction<Student, KV<String, Integer>>() { + @Override + public KV<String, Integer> apply(final Student student) { + return KV.of("", student.getScore()); + } + })) + .apply(GroupByKey.create()) + .apply(MapElements.via(new SimpleFunction<KV<String, Iterable<Integer>>, String>() { + @Override + public String apply(final KV<String, Iterable<Integer>> kv) { + List<Integer> scores = Lists.newArrayList(kv.getValue()); + final int sum = scores.stream().reduce(0, (Integer x, Integer y) -> x + y); + return scores.size() + " " + (double) sum / scores.size(); + } + })); + GenericSourceSink.write(results[i], outputFilePath + "_" + i); + } + + p.run(); + } + + /** + * Student Class. + */ + static class Student implements Serializable { + private int id; + private int percentile; + private int score; + + Student(final int id, final int percentile, final int score) { + this.id = id; + this.percentile = percentile; + this.score = score; + } + + public int getId() { + return id; + } + + public void setId(final int id) { + this.id = id; + } + + public int getPercentile() { + return percentile; + } + + public void setPercentile(final int percentile) { + this.percentile = percentile; + } + + public int getScore() { + return score; + } + + public void setScore(final int score) { + this.score = score; + } + } +} diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java new file mode 100644 index 000000000..eaeeddc31 --- /dev/null +++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PerPercentileAverageITCase.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.examples.beam; + +import edu.snu.nemo.client.JobLauncher; +import edu.snu.nemo.common.test.ArgBuilder; +import edu.snu.nemo.common.test.ExampleTestUtil; +import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Test PerPercentile Average program with JobLauncher. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(JobLauncher.class) +public final class PerPercentileAverageITCase { + private static final int TIMEOUT = 120000; + private static ArgBuilder builder; + private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/"; + + private static final String inputFileName = "test_input_partition"; + private static final String outputFileName = "test_output_partition"; + private static final String expectedOutputFileName = "expected_output_partition"; + private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json"; + private static final String inputFilePath = fileBasePath + inputFileName; + private static final String outputFilePath = fileBasePath + outputFileName; + + @Before + public void setUp() throws Exception { + builder = new ArgBuilder() + .addResourceJson(executorResourceFileName) + .addUserMain(PerPercentileAverage.class.getCanonicalName()) + .addUserArgs(inputFilePath, outputFilePath); + } + + @After + public void tearDown() throws Exception { + try { + for (int i = 0; i < 10; i++) { + ExampleTestUtil.ensureOutputValidity(fileBasePath, + outputFileName + "_" + i, + expectedOutputFileName + "_" + i); + } + } finally { + ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); + } + } + + @Test (timeout = TIMEOUT) + public void test() throws Exception { + JobLauncher.main(builder + .addJobId(PerPercentileAverage.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } +} diff --git a/examples/resources/expected_output_partition_0 b/examples/resources/expected_output_partition_0 new file mode 100644 index 000000000..c03e1b065 --- /dev/null +++ b/examples/resources/expected_output_partition_0 @@ -0,0 +1 @@ +3 13.0 diff --git a/examples/resources/expected_output_partition_1 b/examples/resources/expected_output_partition_1 new file mode 100644 index 000000000..8bfe99f79 --- /dev/null +++ b/examples/resources/expected_output_partition_1 @@ -0,0 +1 @@ +2 22.0 diff --git a/examples/resources/expected_output_partition_2 b/examples/resources/expected_output_partition_2 new file mode 100644 index 000000000..6bd3675cd --- /dev/null +++ b/examples/resources/expected_output_partition_2 @@ -0,0 +1 @@ +2 30.0 diff --git a/examples/resources/expected_output_partition_3 b/examples/resources/expected_output_partition_3 new file mode 100644 index 000000000..c3a9fe47a --- /dev/null +++ b/examples/resources/expected_output_partition_3 @@ -0,0 +1 @@ +3 36.333333333333336 diff --git a/examples/resources/expected_output_partition_4 b/examples/resources/expected_output_partition_4 new file mode 100644 index 000000000..96f2bca89 --- /dev/null +++ b/examples/resources/expected_output_partition_4 @@ -0,0 +1 @@ +3 46.333333333333336 diff --git a/examples/resources/expected_output_partition_5 b/examples/resources/expected_output_partition_5 new file mode 100644 index 000000000..aed062c9b --- /dev/null +++ b/examples/resources/expected_output_partition_5 @@ -0,0 +1 @@ +2 62.0 diff --git a/examples/resources/expected_output_partition_6 b/examples/resources/expected_output_partition_6 new file mode 100644 index 000000000..86bf440ce --- /dev/null +++ b/examples/resources/expected_output_partition_6 @@ -0,0 +1 @@ +4 67.25 diff --git a/examples/resources/expected_output_partition_7 b/examples/resources/expected_output_partition_7 new file mode 100644 index 000000000..e69de29bb diff --git a/examples/resources/expected_output_partition_8 b/examples/resources/expected_output_partition_8 new file mode 100644 index 000000000..d8d34a895 --- /dev/null +++ b/examples/resources/expected_output_partition_8 @@ -0,0 +1 @@ +4 89.75 diff --git a/examples/resources/expected_output_partition_9 b/examples/resources/expected_output_partition_9 new file mode 100644 index 000000000..15efcb051 --- /dev/null +++ b/examples/resources/expected_output_partition_9 @@ -0,0 +1 @@ +5 96.0 diff --git a/examples/resources/test_input_partition b/examples/resources/test_input_partition new file mode 100644 index 000000000..95ee9dcd9 --- /dev/null +++ b/examples/resources/test_input_partition @@ -0,0 +1,28 @@ +1 14 23 +2 49 57 +3 23 29 +5 37 37 +6 41 39 +7 57 63 +8 66 69 +9 91 92 +10 93 93 +11 97 98 +12 96 97 +13 82 90 +14 88 91 +15 7 13 +16 11 21 +17 3 9 +18 81 88 +19 99 100 +20 62 65 +21 83 90 +22 26 31 +23 31 34 +24 39 38 +25 44 43 +26 50 61 +27 61 64 +28 67 71 +29 8 17 \ No newline at end of file diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java index 68ba2c111..91c80d0e7 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java @@ -15,6 +15,8 @@ */ package edu.snu.nemo.runtime.executor.task; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.runtime.executor.data.DataUtil; import edu.snu.nemo.runtime.executor.datatransfer.InputReader; @@ -24,6 +26,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; @@ -111,9 +114,17 @@ Object fetchDataElement() throws IOException { if (currentIteratorIndex == expectedNumOfIterators) { // Entire fetcher is done if (noElementAtAll) { - // This shouldn't normally happen, except for cases such as when Beam's VoidCoder is used. - noElementAtAll = false; - return Void.TYPE; + final Optional<DecoderFactory> decoderFactory = + readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class); + + // TODO #173: Properly handle zero-element task outputs. Currently fetchDataElement relies on + // toString() method to distinguish whether to return Void.TYPE or not. + if (decoderFactory.get().toString().equals("VoidCoder")) { + noElementAtAll = false; + return Void.TYPE; + } else { + return null; + } } else { // This whole fetcher's done return null; diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java index 931e7515f..3f1fd3d2c 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java @@ -15,7 +15,11 @@ */ package edu.snu.nemo.runtime.executor.task; +import edu.snu.nemo.common.coder.DecoderFactory; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap; import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.runtime.common.plan.RuntimeEdge; import edu.snu.nemo.runtime.executor.data.DataUtil; import edu.snu.nemo.runtime.executor.datatransfer.InputReader; import org.junit.Test; @@ -31,6 +35,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.when; /** @@ -41,10 +46,11 @@ public final class ParentTaskDataFetcherTest { @Test(timeout=5000) - public void testEmpty() throws Exception { - // InputReader + public void testVoid() throws Exception { + // TODO #173: Properly handle zero-element. This test should be updated too. final List<String> dataElements = new ArrayList<>(0); // empty data - final InputReader inputReader = generateInputReader(generateCompletableFuture(dataElements.iterator())); + final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()), + "VoidCoder"); // Fetcher final ParentTaskDataFetcher fetcher = createFetcher(inputReader); @@ -53,6 +59,20 @@ public void testEmpty() throws Exception { assertEquals(Void.TYPE, fetcher.fetchDataElement()); } + @Test(timeout=5000) + public void testEmpty() throws Exception { + // TODO #173: Properly handle zero-element. This test should be updated too. + final List<String> dataElements = new ArrayList<>(0); // empty data + final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()), + "IntCoder"); + + // Fetcher + final ParentTaskDataFetcher fetcher = createFetcher(inputReader); + + // Should return Void.TYPE + assertEquals(null, fetcher.fetchDataElement()); + } + @Test(timeout=5000) public void testNonEmpty() throws Exception { // InputReader @@ -111,6 +131,28 @@ private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTas false); } + + private DecoderFactory generateCoder(final String coder) { + final DecoderFactory decoderFactory = mock(DecoderFactory.class); + when(decoderFactory.toString()).thenReturn(coder); + return decoderFactory; + } + + private RuntimeEdge generateEdge(final String coder) { + final String runtimeIREdgeId = "Runtime edge with coder"; + final ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId); + edgeProperties.put(DecoderProperty.of(generateCoder(coder))); + return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, mock(IRVertex.class), mock(IRVertex.class), false); + } + + private InputReader generateInputReaderWithCoder(final CompletableFuture completableFuture, final String coder) { + final InputReader inputReader = mock(InputReader.class); + when(inputReader.read()).thenReturn(Arrays.asList(completableFuture)); + final RuntimeEdge runtimeEdge = generateEdge(coder); + when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge); + return inputReader; + } + private InputReader generateInputReader(final CompletableFuture completableFuture) { final InputReader inputReader = mock(InputReader.class); when(inputReader.read()).thenReturn(Arrays.asList(completableFuture)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
