http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java index bc026c9..6fe549f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java @@ -18,82 +18,68 @@ package org.apache.flink.test.exampleScalaPrograms; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; +import java.io.File; -import org.apache.flink.configuration.Configuration; +import com.google.common.base.Charsets; +import com.google.common.io.Files; import org.apache.flink.examples.scala.graph.PageRankBasic; import org.apache.flink.test.testdata.PageRankData; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class PageRankITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - +public class PageRankITCase extends MultipleProgramsTestBase { + + public PageRankITCase(ExecutionMode mode){ + super(mode); + } + private String verticesPath; private String edgesPath; private String resultPath; - private String expectedResult; - - public PageRankITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES); - edgesPath = createTempFile("edges.txt", PageRankData.EDGES); - } + private String expected; - @Override - protected void testProgram() throws Exception { - expectedResult = runProgram(curProgId); - } - - @Override - protected void postSubmit() throws Exception { - compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + File resultFile = tempFolder.newFile(); + //Delete file because the Scala API does not respect WriteMode set by the configuration + resultFile.delete(); + resultPath = resultFile.toURI().toString(); + + File verticesFile = tempFolder.newFile(); + Files.write(PageRankData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @After + public void after() throws Exception{ + compareKeyValueParisWithDelta(expected, resultPath, " ", 0.01); + } - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); + @Test + public void testPageRankWithSmallNumberOfIterations() throws Exception { + PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"}); + expected = PageRankData.RANKS_AFTER_3_ITERATIONS; } - - public String runProgram(int progId) throws Exception { - - switch(progId) { - case 1: { - PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"}); - return PageRankData.RANKS_AFTER_3_ITERATIONS; - } - case 2: { - // start with a very high number of iteration such that the dynamic convergence criterion must handle termination - PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); - return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; - } - - default: - throw new IllegalArgumentException("Invalid program id"); - } + @Test + public void testPageRankWithConvergence() throws Exception { + // start with a very high number of iteration such that the dynamic convergence criterion must handle termination + PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"}); + expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE; } }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java index 674ca49..aae7168 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java @@ -18,12 +18,10 @@ package org.apache.flink.test.iterative.aggregators; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; import java.util.Random; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; import org.junit.Assert; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; @@ -33,12 +31,14 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; @@ -49,213 +49,185 @@ import org.apache.flink.api.java.operators.IterativeDataSet; * */ @RunWith(Parameterized.class) -public class AggregatorsITCase extends JavaProgramTestBase { +public class AggregatorsITCase extends MultipleProgramsTestBase { - private static final int NUM_PROGRAMS = 5; - private static final int MAX_ITERATIONS = 20; + private static final int MAX_ITERATIONS = 20; private static final int DOP = 2; + private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements"; + + public AggregatorsITCase(ExecutionMode mode){ + super(mode); + } - private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; - private String expectedResult; + private String expected; - public AggregatorsITCase(Configuration config) { - super(config); - } + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - @Override - protected void testProgram() throws Exception { - expectedResult = AggregatorProgs.runProgram(curProgId, resultPath); + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); } - @Override - protected void postSubmit() throws Exception { + @Test + public void testAggregatorWithoutParameterForIterate() throws Exception { + /* + * Test aggregator without parameter for iterate + */ - compareResultsByLinesInMemory(expectedResult, resultPath); - } + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DOP); - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); + IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + // register aggregator + LongSumAggregator aggr = new LongSumAggregator(); + iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + // register convergence criterion + iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, + new NegativeElementsConvergenceCriterion()); + + DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap()); + iteration.closeWith(updatedDs).writeAsText(resultPath); + env.execute(); - return toParameterList(tConfigs); + expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" + + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; } - private static class AggregatorProgs { + @Test + public void testAggregatorWithParameterForIterate() throws Exception { + /* + * Test aggregator with parameter for iterate + */ - private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements"; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DOP); - public static String runProgram(int progId, String resultPath) throws Exception { + DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); + IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); - switch(progId) { - case 1: { - /* - * Test aggregator without parameter for iterate - */ + // register aggregator + LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0); + iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DOP); - - DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); - IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); - - // register aggregator - LongSumAggregator aggr = new LongSumAggregator(); - iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - - // register convergence criterion - iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, - new NegativeElementsConvergenceCriterion()); - - DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); - - // return expected result - return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; - } - case 2: { - /* - * Test aggregator with parameter for iterate - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DOP); - - DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); - IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); - - // register aggregator - LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0); - iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - - // register convergence criterion - iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, - new NegativeElementsConvergenceCriterion()); - - DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); - - // return expected result - return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; - } - case 3: { - /* + // register convergence criterion + iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, + new NegativeElementsConvergenceCriterion()); + + DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam()); + iteration.closeWith(updatedDs).writeAsText(resultPath); + env.execute(); + + expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" + + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + } + + @Test + public void testConvergenceCriterionWithParameterForIterate() throws Exception { + /* * Test convergence criterion with parameter for iterate */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DOP); - - DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); - IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); - - // register aggregator - LongSumAggregator aggr = new LongSumAggregator(); - iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - - // register convergence criterion - iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, - new NegativeElementsConvergenceCriterionWithParam(3)); - - DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); - - // return expected result - return "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" - + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" - + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; - } - case 4: { - /* - * Test aggregator without parameter for iterateDelta - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DOP); - - DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap()); - - DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta( - initialSolutionSet, MAX_ITERATIONS, 0); - - // register aggregator - LongSumAggregator aggr = new LongSumAggregator(); - iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - - DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta()); - - DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet()) - .where(0).equalTo(0).flatMap(new UpdateFilter()); - - DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements); - DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper()); - result.writeAsText(resultPath); - - env.execute(); - - // return expected result - return "1\n" + "2\n" + "2\n" + "3\n" + "3\n" - + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" - + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; - - } - case 5: { - /* - * Test aggregator with parameter for iterateDelta - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(DOP); - - DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap()); - - DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta( - initialSolutionSet, MAX_ITERATIONS, 0); - - // register aggregator - LongSumAggregator aggr = new LongSumAggregatorWithParameter(4); - iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); - - DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta()); - - DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet()) - .where(0).equalTo(0).flatMap(new UpdateFilter()); - - DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements); - DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper()); - result.writeAsText(resultPath); - - env.execute(); - - // return expected result - return "1\n" + "2\n" + "2\n" + "3\n" + "3\n" - + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" - + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - } + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DOP); + + DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env); + IterativeDataSet<Integer> iteration = initialSolutionSet.iterate(MAX_ITERATIONS); + + // register aggregator + LongSumAggregator aggr = new LongSumAggregator(); + iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); + + // register convergence criterion + iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, aggr, + new NegativeElementsConvergenceCriterionWithParam(3)); + + DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap()); + iteration.closeWith(updatedDs).writeAsText(resultPath); + env.execute(); + + expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n" + + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n" + + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; + } + + @Test + public void testAggregatorWithoutParameterForIterateDelta() throws Exception { + /* + * Test aggregator without parameter for iterateDelta + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DOP); + + DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap()); + + DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta( + initialSolutionSet, MAX_ITERATIONS, 0); + + // register aggregator + LongSumAggregator aggr = new LongSumAggregator(); + iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); + + DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta()); + + DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet()) + .where(0).equalTo(0).flatMap(new UpdateFilter()); + + DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements); + DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper()); + result.writeAsText(resultPath); + + env.execute(); + + expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; + } + + @Test + public void testAggregatorWithParameterForIterateDelta() throws Exception { + /* + * Test aggregator with parameter for iterateDelta + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(DOP); + + DataSet<Tuple2<Integer, Integer>> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map(new TupleMakerMap()); + + DeltaIteration<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> iteration = initialSolutionSet.iterateDelta( + initialSolutionSet, MAX_ITERATIONS, 0); + + // register aggregator + LongSumAggregator aggr = new LongSumAggregatorWithParameter(4); + iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, aggr); + + DataSet<Tuple2<Integer, Integer>> updatedDs = iteration.getWorkset().map(new AggregateMapDelta()); + + DataSet<Tuple2<Integer, Integer>> newElements = updatedDs.join(iteration.getSolutionSet()) + .where(0).equalTo(0).flatMap(new UpdateFilter()); + + DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements); + DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper()); + result.writeAsText(resultPath); + + env.execute(); + + expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n" + + "3\n" + "4\n" + "4\n" + "4\n" + "4\n" + + "5\n" + "5\n" + "5\n" + "5\n" + "5\n"; } @SuppressWarnings("serial") @@ -294,7 +266,7 @@ public class AggregatorsITCase extends JavaProgramTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); } @Override @@ -316,7 +288,7 @@ public class AggregatorsITCase extends JavaProgramTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); } @Override @@ -366,11 +338,11 @@ public class AggregatorsITCase extends JavaProgramTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); superstep = getIterationRuntimeContext().getSuperstepNumber(); if (superstep > 1) { - previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR); // check previous aggregator value Assert.assertEquals(superstep - 1, previousAggr.getValue()); } @@ -429,11 +401,11 @@ public class AggregatorsITCase extends JavaProgramTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); superstep = getIterationRuntimeContext().getSuperstepNumber(); if (superstep > 1) { - previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorProgs.NEGATIVE_ELEMENTS_AGGR); + previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(NEGATIVE_ELEMENTS_AGGR); // check previous aggregator value switch(superstep) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java index 61ad863..3fbcae6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java @@ -18,135 +18,107 @@ package org.apache.flink.test.javaApiOperators; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) -public class AggregateITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 3; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public AggregateITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } +public class AggregateITCase extends MultipleProgramsTestBase { - @Override - protected void testProgram() throws Exception { - expectedResult = AggregateProgs.runProgram(curProgId, resultPath); + + public AggregateITCase(ExecutionMode mode){ + super(mode); } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); } - - private static class AggregateProgs { - - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* + + @Test + public void testFullAggregate() throws Exception { + /* * Full Aggregate */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<Integer, Long>> aggregateDs = ds - .aggregate(Aggregations.SUM, 0) - .and(Aggregations.MAX, 1) + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> aggregateDs = ds + .aggregate(Aggregations.SUM, 0) + .and(Aggregations.MAX, 1) .project(0, 1); - - aggregateDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "231,6\n"; - } - case 2: { - /* + + aggregateDs.writeAsCsv(resultPath); + env.execute(); + + expected = "231,6\n"; + } + + @Test + public void testGroupedAggregate() throws Exception { + /* * Grouped Aggregate */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1) - .aggregate(Aggregations.SUM, 0) + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.SUM, 0) .project(1, 0); - - aggregateDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1\n" + + + aggregateDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; - } - case 3: { - /* - * Nested Aggregate - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1) - .aggregate(Aggregations.MIN, 0) - .aggregate(Aggregations.MIN, 0) + } + + @Test + public void testNestedAggregate() throws Exception { + /* + * Nested Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.MIN, 0) + .aggregate(Aggregations.MIN, 0) .project(0); - - aggregateDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - } + + aggregateDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1\n"; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index ffc208c..b249e22 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -18,11 +18,8 @@ package org.apache.flink.test.javaApiOperators; -import java.io.FileNotFoundException; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.functions.CoGroupFunction; @@ -39,478 +36,486 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.util.Collector; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class CoGroupITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 13; - - private int curProgId = config.getInteger("ProgramId", -1); +public class CoGroupITCase extends MultipleProgramsTestBase { + + public CoGroupITCase(ExecutionMode mode){ + super(mode); + } + private String resultPath; - private String expectedResult; - - public CoGroupITCase(Configuration config) { - super(config); + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); } - @Override - protected void testProgram() throws Exception { - expectedResult = CoGroupProgs.runProgram(curProgId, resultPath); + @Test + public void testCoGroupTuplesWithKeyFieldSelector() throws Exception { + /* + * CoGroup on tuples with key field selector + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup()); + + coGroupDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,0\n" + + "2,6\n" + + "3,24\n" + + "4,60\n" + + "5,120\n"; } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + + @Test + public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception { + /* + * CoGroup on two custom type inputs with key extractors + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new + KeySelector5()).with(new CustomTypeCoGroup()); + + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "1,0,test\n" + + "2,6,test\n" + + "3,24,test\n" + + "4,60,test\n" + + "5,120,test\n" + + "6,210,test\n"; } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + public static class KeySelector4 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); + public static class KeySelector5 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; } - - return toParameterList(tConfigs); } - - private static class CoGroupProgs { - - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - - /* - * CoGroup on tuples with key field selector - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup()); - - coGroupDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,0\n" + - "2,6\n" + - "3,24\n" + - "4,60\n" + - "5,120\n"; - } - case 2: { - - /* - * CoGroup on two custom type inputs with key extractors - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).equalTo(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).with(new CustomTypeCoGroup()); - - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "1,0,test\n" + - "2,6,test\n" + - "3,24,test\n" + - "4,60,test\n" + - "5,120,test\n" + - "6,210,test\n"; - } - case 3: { - - /* - * check correctness of cogroup if UDF returns left input objects - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft()); - - coGroupDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,Hi\n" + - "2,2,Hello\n" + - "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + - "5,3,I am fine.\n"; - - } - case 4: { - - /* - * check correctness of cogroup if UDF returns right input objects - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight()); - - coGroupDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,0,Hallo,1\n" + - "2,2,1,Hallo Welt,2\n" + - "2,3,2,Hallo Welt wie,1\n" + - "3,4,3,Hallo Welt wie gehts?,2\n" + - "3,5,4,ABC,2\n" + - "3,6,5,BCD,3\n"; - - } - case 5: { - - /* - * Reduce with broadcast set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints"); - - coGroupDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,0,55\n" + - "2,6,55\n" + - "3,24,55\n" + - "4,60,55\n" + - "5,120,55\n"; - } - case 6: { - - /* - * CoGroup on a tuple input with key field selector and a custom type input with key extractor - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).with(new MixedCoGroup()); - - coGroupDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "0,1,test\n" + - "1,2,test\n" + - "2,5,test\n" + - "3,15,test\n" + - "4,33,test\n" + - "5,63,test\n" + - "6,109,test\n" + - "7,4,test\n" + - "8,4,test\n" + - "9,4,test\n" + - "10,5,test\n" + - "11,5,test\n" + - "12,5,test\n" + - "13,5,test\n" + - "14,5,test\n"; - - } - case 7: { - - /* - * CoGroup on a tuple input with key field selector and a custom type input with key extractor - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).equalTo(2).with(new MixedCoGroup2()); - - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "0,1,test\n" + - "1,2,test\n" + - "2,5,test\n" + - "3,15,test\n" + - "4,33,test\n" + - "5,63,test\n" + - "6,109,test\n" + - "7,4,test\n" + - "8,4,test\n" + - "9,4,test\n" + - "10,5,test\n" + - "11,5,test\n" + - "12,5,test\n" + - "13,5,test\n" + - "14,5,test\n"; - - } - case 8: { - /* - * CoGroup with multiple key fields - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); - - DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). - where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup()); - - coGrouped.writeAsCsv(resultPath); - env.execute(); - - return "1,1,Hallo\n" + - "2,2,Hallo Welt\n" + - "3,2,Hallo Welt wie gehts?\n" + - "3,2,ABC\n" + - "5,3,HIJ\n" + - "5,3,IJK\n"; - } - case 9: { - /* - * CoGroup with multiple key fields - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); - - DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). - where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) { - return new Tuple2<Integer, Long>(t.f0, t.f4); - } - }). - equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) { - return new Tuple2<Integer, Long>(t.f0, t.f1); - } - }).with(new Tuple5Tuple3CoGroup()); - - coGrouped.writeAsCsv(resultPath); - env.execute(); - - return "1,1,Hallo\n" + - "2,2,Hallo Welt\n" + - "3,2,Hallo Welt wie gehts?\n" + - "3,2,ABC\n" + - "5,3,HIJ\n" + - "5,3,IJK\n"; - } - case 10: { - /* - * CoGroup on two custom type inputs using expression keys - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup()); - - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "1,0,test\n" + - "2,6,test\n" + - "3,24,test\n" + - "4,60,test\n" + - "5,120,test\n" + - "6,210,test\n"; - } - case 11: { - /* - * CoGroup on two custom type inputs using expression keys - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<CustomType> coGroupDs = ds.coGroup(ds2) - .where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { - private static final long serialVersionUID = 1L; - - @Override - public void coGroup( - Iterable<POJO> first, - Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, - Collector<CustomType> out) throws Exception { - for(POJO p : first) { - for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { - Assert.assertTrue(p.nestedPojo.longNumber == t.f6); - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); - } - } - } - }); - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "-1,20000,Flink\n" + - "-1,10000,Flink\n" + - "-1,30000,Flink\n"; - } - case 12: { - /* - * CoGroup field-selector (expression keys) + key selector function - * The key selector is unnecessary complicated (Tuple1) ;) - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<CustomType> coGroupDs = ds.coGroup(ds2) - .where(new KeySelector<POJO, Tuple1<Long>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple1<Long> getKey(POJO value) - throws Exception { - return new Tuple1<Long>(value.nestedPojo.longNumber); - } - }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { - private static final long serialVersionUID = 1L; - - @Override - public void coGroup( - Iterable<POJO> first, - Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, - Collector<CustomType> out) throws Exception { - for(POJO p : first) { - for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { - Assert.assertTrue(p.nestedPojo.longNumber == t.f6); - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); - } - } - } - }); - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "-1,20000,Flink\n" + - "-1,10000,Flink\n" + - "-1,30000,Flink\n"; + + @Test + public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception { + /* + * check correctness of cogroup if UDF returns left input objects + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft()); + + coGroupDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n"; + } + + @Test + public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception { + /* + * check correctness of cogroup if UDF returns right input objects + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight()); + + coGroupDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,0,Hallo,1\n" + + "2,2,1,Hallo Welt,2\n" + + "2,3,2,Hallo Welt wie,1\n" + + "3,4,3,Hallo Welt wie gehts?,2\n" + + "3,5,4,ABC,2\n" + + "3,6,5,BCD,3\n"; + } + + @Test + public void testCoGroupWithBroadcastSet() throws Exception { + /* + * Reduce with broadcast set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints"); + + coGroupDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,0,55\n" + + "2,6,55\n" + + "3,24,55\n" + + "4,60,55\n" + + "5,120,55\n"; + } + + @Test + public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() + throws Exception { + /* + * CoGroup on a tuple input with key field selector and a custom type input with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new + KeySelector2()).with(new MixedCoGroup()); + + coGroupDs.writeAsCsv(resultPath); + env.execute(); + + expected = "0,1,test\n" + + "1,2,test\n" + + "2,5,test\n" + + "3,15,test\n" + + "4,33,test\n" + + "5,63,test\n" + + "6,109,test\n" + + "7,4,test\n" + + "8,4,test\n" + + "9,4,test\n" + + "10,5,test\n" + + "11,5,test\n" + + "12,5,test\n" + + "13,5,test\n" + + "14,5,test\n"; + } + + public static class KeySelector2 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector() + throws Exception { + /* + * CoGroup on a tuple input with key field selector and a custom type input with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with + (new MixedCoGroup2()); + + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "0,1,test\n" + + "1,2,test\n" + + "2,5,test\n" + + "3,15,test\n" + + "4,33,test\n" + + "5,63,test\n" + + "6,109,test\n" + + "7,4,test\n" + + "8,4,test\n" + + "9,4,test\n" + + "10,5,test\n" + + "11,5,test\n" + + "12,5,test\n" + + "13,5,test\n" + + "14,5,test\n"; + + } + + public static class KeySelector3 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception { + /* + * CoGroup with multiple key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup()); + + coGrouped.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception { + /* + * CoGroup with multiple key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(new KeySelector7()). + equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup()); + + coGrouped.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + } + + public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, + Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + } + + public static class KeySelector8 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) { + return new Tuple2<Integer, Long>(t.f0, t.f1); + } + } + + @Test + public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup()); + + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "1,0,test\n" + + "2,6,test\n" + + "3,24,test\n" + + "4,60,test\n" + + "5,120,test\n" + + "6,210,test\n"; + } + + @Test + public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws + Exception { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1()); + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + } + + public static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } } - case 13: { - /* - * CoGroup field-selector (expression keys) + key selector function - * The key selector is simple here - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<CustomType> coGroupDs = ds.coGroup(ds2) - .where(new KeySelector<POJO, Long>() { - private static final long serialVersionUID = 1L; - - @Override - public Long getKey(POJO value) - throws Exception { - return value.nestedPojo.longNumber; - } - }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { - private static final long serialVersionUID = 1L; - - @Override - public void coGroup( - Iterable<POJO> first, - Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, - Collector<CustomType> out) throws Exception { - for(POJO p : first) { - for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { - Assert.assertTrue(p.nestedPojo.longNumber == t.f6); - out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); - } - } - } - }); - coGroupDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "-1,20000,Flink\n" + - "-1,10000,Flink\n" + - "-1,30000,Flink\n"; + } + } + + @Test + public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is unnecessary complicated (Tuple1) ;) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector6()).equalTo(6).with(new CoGroup3()); + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + + } + + public static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple1<Long> getKey(POJO value) + throws Exception { + return new Tuple1<Long>(value.nestedPojo.longNumber); + } + } + + public static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer, + String, Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } } - - default: - throw new IllegalArgumentException("Invalid program id"); + } + } + + @Test + public void testCoGroupFieldSelectorAndKeySelector() throws Exception { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is simple here + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector1()).equalTo(6).with(new CoGroup2()); + coGroupDs.writeAsText(resultPath); + env.execute(); + + expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + } + + public static class KeySelector1 implements KeySelector<POJO, Long> { + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(POJO value) + throws Exception { + return value.nestedPojo.longNumber; + } + } + + public static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String, + Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } } - } - } - + public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java index 7d79ea5..bd32bfc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java @@ -18,10 +18,7 @@ package org.apache.flink.test.javaApiOperators; -import java.io.FileNotFoundException; -import java.io.IOException; import java.util.Collection; -import java.util.LinkedList; import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.RichCrossFunction; @@ -32,371 +29,336 @@ import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) -public class CrossITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 11; - - private int curProgId = config.getInteger("ProgramId", -1); +public class CrossITCase extends MultipleProgramsTestBase { + + public CrossITCase(ExecutionMode mode){ + super(mode); + } + private String resultPath; - private String expectedResult; - - public CrossITCase(Configuration config) { - super(config); + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); } - @Override - protected void testProgram() throws Exception { - expectedResult = CrossProgs.runProgram(curProgId, resultPath); + @Test + public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception { + /* + * check correctness of cross on two tuple inputs + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); + + @Test + public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception { + /* + * check correctness of cross if UDF returns left input object + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,Hi\n" + + "1,1,Hi\n" + + "1,1,Hi\n" + + "2,2,Hello\n" + + "2,2,Hello\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "3,2,Hello world\n" + + "3,2,Hello world\n"; } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception { + /* + * check correctness of cross if UDF returns right input object + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "1,1,0,Hallo,1\n" + + "1,1,0,Hallo,1\n" + + "1,1,0,Hallo,1\n" + + "2,2,1,Hallo Welt,2\n" + + "2,2,1,Hallo Welt,2\n" + + "2,2,1,Hallo Welt,2\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,3,2,Hallo Welt wie,1\n"; - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); } - - private static class CrossProgs { - - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - - /* - * check correctness of cross on two tuple inputs - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "0,HalloHallo\n" + - "1,HalloHallo Welt\n" + - "2,HalloHallo Welt wie\n" + - "1,Hallo WeltHallo\n" + - "2,Hallo WeltHallo Welt\n" + - "3,Hallo WeltHallo Welt wie\n" + - "2,Hallo Welt wieHallo\n" + - "3,Hallo Welt wieHallo Welt\n" + - "4,Hallo Welt wieHallo Welt wie\n"; - } - case 2: { - - /* - * check correctness of cross if UDF returns left input object - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,Hi\n" + - "1,1,Hi\n" + - "1,1,Hi\n" + - "2,2,Hello\n" + - "2,2,Hello\n" + - "2,2,Hello\n" + - "3,2,Hello world\n" + - "3,2,Hello world\n" + - "3,2,Hello world\n"; - - } - case 3: { - - /* - * check correctness of cross if UDF returns right input object - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,0,Hallo,1\n" + - "1,1,0,Hallo,1\n" + - "1,1,0,Hallo,1\n" + - "2,2,1,Hallo Welt,2\n" + - "2,2,1,Hallo Welt,2\n" + - "2,2,1,Hallo Welt,2\n" + - "2,3,2,Hallo Welt wie,1\n" + - "2,3,2,Hallo Welt wie,1\n" + - "2,3,2,Hallo Welt wie,1\n"; - - } - case 4: { - - /* - * check correctness of cross with broadcast set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints"); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "2,0,55\n" + - "3,0,55\n" + - "3,0,55\n" + - "3,0,55\n" + - "4,1,55\n" + - "4,2,55\n" + - "3,0,55\n" + - "4,2,55\n" + - "4,4,55\n"; - } - case 5: { - - /* - * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross) - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "0,HalloHallo\n" + - "1,HalloHallo Welt\n" + - "2,HalloHallo Welt wie\n" + - "1,Hallo WeltHallo\n" + - "2,Hallo WeltHallo Welt\n" + - "3,Hallo WeltHallo Welt wie\n" + - "2,Hallo Welt wieHallo\n" + - "3,Hallo Welt wieHallo Welt\n" + - "4,Hallo Welt wieHallo Welt wie\n"; - - } - case 6: { - - /* - * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross) - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "0,HalloHallo\n" + - "1,HalloHallo Welt\n" + - "2,HalloHallo Welt wie\n" + - "1,Hallo WeltHallo\n" + - "2,Hallo WeltHallo Welt\n" + - "3,Hallo WeltHallo Welt wie\n" + - "2,Hallo Welt wieHallo\n" + - "3,Hallo Welt wieHallo Welt\n" + - "4,Hallo Welt wieHallo Welt wie\n"; - - } - case 7: { - /* - * project cross on a tuple input 1 - */ + @Test + public void testCorrectnessOfCrossWithBroadcastSet() throws Exception { + /* + * check correctness of cross with broadcast set + */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2) - .projectFirst(2, 1) - .projectSecond(3) - .projectFirst(0) - .projectSecond(4,1); + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); - crossDs.writeAsCsv(resultPath); - env.execute(); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints"); - // return expected result - return "Hi,1,Hallo,1,1,1\n" + - "Hi,1,Hallo Welt,1,2,2\n" + - "Hi,1,Hallo Welt wie,1,1,3\n" + - "Hello,2,Hallo,2,1,1\n" + - "Hello,2,Hallo Welt,2,2,2\n" + - "Hello,2,Hallo Welt wie,2,1,3\n" + - "Hello world,2,Hallo,3,1,1\n" + - "Hello world,2,Hallo Welt,3,2,2\n" + - "Hello world,2,Hallo Welt wie,3,1,3\n"; + crossDs.writeAsCsv(resultPath); + env.execute(); - } - case 8: { + expected = "2,0,55\n" + + "3,0,55\n" + + "3,0,55\n" + + "3,0,55\n" + + "4,1,55\n" + + "4,2,55\n" + + "3,0,55\n" + + "4,2,55\n" + + "4,4,55\n"; + } + + @Test + public void testCorrectnessOfCrossWithHuge() throws Exception { + /* + * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; + } + + @Test + public void testCorrectnessOfCrossWithTiny() throws Exception { + /* + * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; + } - /* - * project cross on a tuple input 2 - */ + @Test + public void testProjectCrossOnATupleInput1() throws Exception{ + /* + * project cross on a tuple input 1 + */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2) - .projectSecond(3) - .projectFirst(2, 1) - .projectSecond(4,1) + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2) + .projectFirst(2, 1) + .projectSecond(3) + .projectFirst(0) + .projectSecond(4,1); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "Hi,1,Hallo,1,1,1\n" + + "Hi,1,Hallo Welt,1,2,2\n" + + "Hi,1,Hallo Welt wie,1,1,3\n" + + "Hello,2,Hallo,2,1,1\n" + + "Hello,2,Hallo Welt,2,2,2\n" + + "Hello,2,Hallo Welt wie,2,1,3\n" + + "Hello world,2,Hallo,3,1,1\n" + + "Hello world,2,Hallo Welt,3,2,2\n" + + "Hello world,2,Hallo Welt wie,3,1,3\n"; + } + + @Test + public void testProjectCrossOnATupleInput2() throws Exception { + /* + * project cross on a tuple input 2 + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2) + .projectSecond(3) + .projectFirst(2, 1) + .projectSecond(4,1) .projectFirst(0); - crossDs.writeAsCsv(resultPath); - env.execute(); + crossDs.writeAsCsv(resultPath); + env.execute(); - // return expected result - return "Hallo,Hi,1,1,1,1\n" + - "Hallo Welt,Hi,1,2,2,1\n" + - "Hallo Welt wie,Hi,1,1,3,1\n" + - "Hallo,Hello,2,1,1,2\n" + - "Hallo Welt,Hello,2,2,2,2\n" + - "Hallo Welt wie,Hello,2,1,3,2\n" + - "Hallo,Hello world,2,1,1,3\n" + - "Hallo Welt,Hello world,2,2,2,3\n" + - "Hallo Welt wie,Hello world,2,1,3,3\n"; + expected = "Hallo,Hi,1,1,1,1\n" + + "Hallo Welt,Hi,1,2,2,1\n" + + "Hallo Welt wie,Hi,1,1,3,1\n" + + "Hallo,Hello,2,1,1,2\n" + + "Hallo Welt,Hello,2,2,2,2\n" + + "Hallo Welt wie,Hello,2,1,3,2\n" + + "Hallo,Hello world,2,1,1,3\n" + + "Hallo Welt,Hello world,2,2,2,3\n" + + "Hallo Welt wie,Hello world,2,1,3,3\n"; - } - case 9: { - /* - * check correctness of default cross - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + - "(1,1,Hi),(1,1,0,Hallo,1)\n" + - "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" + - "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + - "(2,2,Hello),(1,1,0,Hallo,1)\n" + - "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + - "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" + - "(3,2,Hello world),(1,1,0,Hallo,1)\n" + - "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n"; - - } + } - case 10: { - - /* - * check correctness of cross on two custom type inputs - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross()); - - crossDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "1,0,HiHi\n" - + "2,1,HiHello\n" - + "2,2,HiHello world\n" - + "2,1,HelloHi\n" - + "4,2,HelloHello\n" - + "4,3,HelloHello world\n" - + "2,2,Hello worldHi\n" - + "4,3,Hello worldHello\n" - + "4,4,Hello worldHello world"; - } - - case 11: { - - /* - * check correctness of cross a tuple input and a custom type input - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross()); - - crossDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "2,0,HalloHi\n" + - "3,0,HalloHello\n" + - "3,0,HalloHello world\n" + - "3,0,Hallo WeltHi\n" + - "4,1,Hallo WeltHello\n" + - "4,2,Hallo WeltHello world\n" + - "3,0,Hallo Welt wieHi\n" + - "4,2,Hallo Welt wieHello\n" + - "4,4,Hallo Welt wieHello world\n"; - - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - - } - + @Test + public void testCorrectnessOfDefaultCross() throws Exception { + /* + * check correctness of default cross + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + + "(1,1,Hi),(1,1,0,Hallo,1)\n" + + "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" + + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + + "(2,2,Hello),(1,1,0,Hallo,1)\n" + + "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + + "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" + + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + + "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n"; + } + + @Test + public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception { + /* + * check correctness of cross on two custom type inputs + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross()); + + crossDs.writeAsText(resultPath); + env.execute(); + + expected = "1,0,HiHi\n" + + "2,1,HiHello\n" + + "2,2,HiHello world\n" + + "2,1,HelloHi\n" + + "4,2,HelloHello\n" + + "4,3,HelloHello world\n" + + "2,2,Hello worldHi\n" + + "4,3,Hello worldHello\n" + + "4,4,Hello worldHello world"; + } + + @Test + public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception { + /* + * check correctness of cross a tuple input and a custom type input + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross()); + + crossDs.writeAsCsv(resultPath); + env.execute(); + + expected = "2,0,HalloHi\n" + + "3,0,HalloHello\n" + + "3,0,HalloHello world\n" + + "3,0,Hallo WeltHi\n" + + "4,1,Hallo WeltHello\n" + + "4,2,Hallo WeltHello world\n" + + "3,0,Hallo Welt wieHi\n" + + "4,2,Hallo Welt wieHello\n" + + "4,4,Hallo Welt wieHello world\n"; } public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
