http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java new file mode 100644 index 0000000..2452475 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.test.util.TestEnvironment; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +/** + * Integration tests for custom {@link DataDistribution}. + */ +@SuppressWarnings("serial") +public class CustomDistributionITCase extends TestLogger { + + // ------------------------------------------------------------------------ + // The mini cluster that is shared across tests + // ------------------------------------------------------------------------ + + private static LocalFlinkMiniCluster cluster; + + @BeforeClass + public static void setup() throws Exception { + cluster = TestBaseUtils.startCluster(1, 8, false, false, true); + } + + @AfterClass + public static void teardown() throws Exception { + TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } + + @Before + public void prepare() { + TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, false); + clusterEnv.setAsContext(); + } + + @After + public void cleanup() { + TestEnvironment.unsetAsContext(); + } + + // ------------------------------------------------------------------------ + + /** + * Test the record partitioned rightly with one field according to the customized data distribution. + */ + @Test + public void testPartitionWithDistribution1() throws Exception { + final TestDataDist1 dist = new TestDataDist1(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(dist.getParallelism()); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Boolean> result = DataSetUtils + .partitionByRange(input, dist, 0) + .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { + + @Override + public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception { + int pIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3<Integer, Long, String> s : values) { + boolean correctlyPartitioned = true; + if (pIdx == 0) { + Integer[] upper = dist.boundaries[0]; + if (s.f0.compareTo(upper[0]) > 0) { + correctlyPartitioned = false; + } + } + else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { + Integer[] lower = dist.boundaries[pIdx - 1]; + Integer[] upper = dist.boundaries[pIdx]; + if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } + else { + Integer[] lower = dist.boundaries[pIdx - 1]; + if ((s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } + + if (!correctlyPartitioned) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + } + ); + + result.output(new DiscardingOutputFormat<Boolean>()); + env.execute(); + } + + /** + * Test the record partitioned rightly with two fields according to the customized data distribution. + */ + @Test + public void testRangeWithDistribution2() throws Exception { + final TestDataDist2 dist = new TestDataDist2(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(dist.getParallelism()); + + DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements( + new Tuple3<>(1, 5, "Hi"), + new Tuple3<>(1, 6, "Hi"), + new Tuple3<>(1, 7, "Hi"), + new Tuple3<>(1, 11, "Hello"), + new Tuple3<>(2, 3, "World"), + new Tuple3<>(2, 4, "World"), + new Tuple3<>(2, 5, "World"), + new Tuple3<>(2, 13, "Hello World"), + new Tuple3<>(3, 8, "Say"), + new Tuple3<>(4, 0, "Why"), + new Tuple3<>(4, 2, "Java"), + new Tuple3<>(4, 11, "Say Hello"), + new Tuple3<>(5, 1, "Hi Java!"), + new Tuple3<>(5, 2, "Hi Java?"), + new Tuple3<>(5, 3, "Hi Java again") + ); + + DataSet<Boolean> result = DataSetUtils + .partitionByRange(input, dist, 0, 1) + .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() { + + @Override + public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception { + int pIdx = getRuntimeContext().getIndexOfThisSubtask(); + boolean correctlyPartitioned = true; + + for (Tuple3<Integer, Integer, String> s : values) { + + if (pIdx == 0) { + Integer[] upper = dist.boundaries[0]; + if (s.f0.compareTo(upper[0]) > 0 || + (s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0)) { + correctlyPartitioned = false; + } + } + else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { + Integer[] lower = dist.boundaries[pIdx - 1]; + Integer[] upper = dist.boundaries[pIdx]; + + if (s.f0.compareTo(upper[0]) > 0 || + (s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0) || + (s.f0.compareTo(lower[0]) < 0) || + (s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) { + correctlyPartitioned = false; + } + } + else { + Integer[] lower = dist.boundaries[pIdx - 1]; + if ((s.f0.compareTo(lower[0]) < 0) || + (s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) { + correctlyPartitioned = false; + } + } + + if (!correctlyPartitioned) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + } + ); + + result.output(new DiscardingOutputFormat<Boolean>()); + env.execute(); + } + + /* + * Test the number of partition keys less than the number of distribution fields + */ + @Test + public void testPartitionKeyLessDistribution() throws Exception { + final TestDataDist2 dist = new TestDataDist2(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(dist.getParallelism()); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Boolean> result = DataSetUtils + .partitionByRange(input, dist, 0) + .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { + + @Override + public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception { + int pIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3<Integer, Long, String> s : values) { + boolean correctlyPartitioned = true; + if (pIdx == 0) { + Integer[] upper = dist.boundaries[0]; + if (s.f0.compareTo(upper[0]) > 0) { + correctlyPartitioned = false; + } + } + else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) { + Integer[] lower = dist.boundaries[pIdx - 1]; + Integer[] upper = dist.boundaries[pIdx]; + if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } + else { + Integer[] lower = dist.boundaries[pIdx - 1]; + if ((s.f0.compareTo(lower[0]) <= 0)) { + correctlyPartitioned = false; + } + } + + if (!correctlyPartitioned) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + } + ); + + result.output(new DiscardingOutputFormat<Boolean>()); + env.execute(); + } + + /* + * Test the number of partition keys larger than the number of distribution fields + */ + @Test(expected = IllegalArgumentException.class) + public void testPartitionMoreThanDistribution() throws Exception { + final TestDataDist2 dist = new TestDataDist2(); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + DataSetUtils.partitionByRange(input, dist, 0, 1, 2); + } + + /** + * The class is used to do the tests of range partition with one key. + */ + public static class TestDataDist1 implements DataDistribution { + + public Integer[][] boundaries = new Integer[][]{ + new Integer[]{4}, + new Integer[]{9}, + new Integer[]{13}, + new Integer[]{18} + }; + + public TestDataDist1() {} + + public int getParallelism() { + return boundaries.length; + } + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return boundaries[bucketNum]; + } + + @Override + public int getNumberOfFields() { + return 1; + } + + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException {} + + @Override + public void read(DataInputView in) throws IOException {} + } + + /** + * The class is used to do the tests of range partition with two keys. + */ + public static class TestDataDist2 implements DataDistribution { + + public Integer[][] boundaries = new Integer[][]{ + new Integer[]{1, 6}, + new Integer[]{2, 4}, + new Integer[]{3, 9}, + new Integer[]{4, 1}, + new Integer[]{5, 2} + }; + + public TestDataDist2() {} + + public int getParallelism() { + return boundaries.length; + } + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return boundaries[bucketNum]; + } + + @Override + public int getNumberOfFields() { + return 2; + } + + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException {} + + @Override + public void read(DataInputView in) throws IOException {} + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java new file mode 100644 index 0000000..deb5170 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.BufferedReader; +import java.util.Random; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for data sinks. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class DataSinkITCase extends MultipleProgramsTestBase { + + public DataSinkITCase(TestExecutionMode mode) { + super(mode); + } + + private String resultPath; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @Test + public void testIntSortingParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env); + ds.writeAsText(resultPath).sortLocalOutput("*", Order.DESCENDING).setParallelism(1); + + env.execute(); + + String expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n"; + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + + } + + @Test + public void testStringSortingParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + ds.writeAsText(resultPath).sortLocalOutput("*", Order.ASCENDING).setParallelism(1); + + env.execute(); + + String expected = "Hello\n" + + "Hello world\n" + + "Hello world, how are you?\n" + + "Hi\n" + + "I am fine.\n" + + "LOL\n" + + "Luke Skywalker\n" + + "Random comment\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingSingleAscParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.ASCENDING).setParallelism(1); + + env.execute(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + + "9,4,Comment#3\n" + + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + + "21,6,Comment#15\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingSingleDescParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.DESCENDING).setParallelism(1); + + env.execute(); + + String expected = "21,6,Comment#15\n" + + "20,6,Comment#14\n" + + "19,6,Comment#13\n" + + "18,6,Comment#12\n" + + "17,6,Comment#11\n" + + "16,6,Comment#10\n" + + "15,5,Comment#9\n" + + "14,5,Comment#8\n" + + "13,5,Comment#7\n" + + "12,5,Comment#6\n" + + "11,5,Comment#5\n" + + "10,4,Comment#4\n" + + "9,4,Comment#3\n" + + "8,4,Comment#2\n" + + "7,4,Comment#1\n" + + "6,3,Luke Skywalker\n" + + "5,3,I am fine.\n" + + "4,3,Hello world, how are you?\n" + + "3,2,Hello world\n" + + "2,2,Hello\n" + + "1,1,Hi\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingDualParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + ds.writeAsCsv(resultPath) + .sortLocalOutput(1, Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING) + .setParallelism(1); + + env.execute(); + + String expected = "16,6,Comment#10\n" + + "17,6,Comment#11\n" + + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + + "21,6,Comment#15\n" + + "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + + "9,4,Comment#3\n" + + "10,4,Comment#4\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + + "6,3,Luke Skywalker\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "1,1,Hi\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingNestedParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = + CollectionDataSets.getGroupSortedNestedTupleDataSet2(env); + ds.writeAsText(resultPath) + .sortLocalOutput("f0.f1", Order.ASCENDING) + .sortLocalOutput("f1", Order.DESCENDING) + .setParallelism(1); + + env.execute(); + + String expected = + "((2,1),a,3)\n" + + "((2,2),b,4)\n" + + "((1,2),a,1)\n" + + "((3,3),c,5)\n" + + "((1,3),a,2)\n" + + "((3,6),c,6)\n" + + "((4,9),c,7)\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testTupleSortingNestedParallelism1_2() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds = + CollectionDataSets.getGroupSortedNestedTupleDataSet2(env); + ds.writeAsText(resultPath) + .sortLocalOutput(1, Order.ASCENDING) + .sortLocalOutput(2, Order.DESCENDING) + .setParallelism(1); + + env.execute(); + + String expected = + "((2,1),a,3)\n" + + "((1,3),a,2)\n" + + "((1,2),a,1)\n" + + "((2,2),b,4)\n" + + "((4,9),c,7)\n" + + "((3,6),c,6)\n" + + "((3,3),c,5)\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testPojoSortingSingleParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); + ds.writeAsText(resultPath).sortLocalOutput("number", Order.ASCENDING).setParallelism(1); + + env.execute(); + + String expected = "1 First (10,100,1000,One) 10100\n" + + "2 First_ (10,105,1000,One) 10200\n" + + "3 First (11,102,3000,One) 10200\n" + + "4 First_ (11,106,1000,One) 10300\n" + + "5 First (11,102,2000,One) 10100\n" + + "6 Second_ (20,200,2000,Two) 10100\n" + + "7 Third (31,301,2000,Three) 10200\n" + + "8 Third_ (30,300,1000,Three) 10100\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testPojoSortingDualParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); + ds.writeAsText(resultPath) + .sortLocalOutput("str", Order.ASCENDING) + .sortLocalOutput("number", Order.DESCENDING) + .setParallelism(1); + + env.execute(); + + String expected = + "5 First (11,102,2000,One) 10100\n" + + "3 First (11,102,3000,One) 10200\n" + + "1 First (10,100,1000,One) 10100\n" + + "4 First_ (11,106,1000,One) 10300\n" + + "2 First_ (10,105,1000,One) 10200\n" + + "6 Second_ (20,200,2000,Two) 10100\n" + + "7 Third (31,301,2000,Three) 10200\n" + + "8 Third_ (30,300,1000,Three) 10100\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + + } + + @Test + public void testPojoSortingNestedParallelism1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); + ds.writeAsText(resultPath) + .sortLocalOutput("nestedTupleWithCustom.f0", Order.ASCENDING) + .sortLocalOutput("nestedTupleWithCustom.f1.myInt", Order.DESCENDING) + .sortLocalOutput("nestedPojo.longNumber", Order.ASCENDING) + .setParallelism(1); + + env.execute(); + + String expected = + "2 First_ (10,105,1000,One) 10200\n" + + "1 First (10,100,1000,One) 10100\n" + + "4 First_ (11,106,1000,One) 10300\n" + + "5 First (11,102,2000,One) 10100\n" + + "3 First (11,102,3000,One) 10200\n" + + "6 Second_ (20,200,2000,Two) 10100\n" + + "8 Third_ (30,300,1000,Three) 10100\n" + + "7 Third (31,301,2000,Three) 10200\n"; + + compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath); + } + + @Test + public void testSortingParallelism4() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> ds = env.generateSequence(0, 1000); + // randomize + ds.map(new MapFunction<Long, Long>() { + + Random rand = new Random(1234L); + @Override + public Long map(Long value) throws Exception { + return rand.nextLong(); + } + }).writeAsText(resultPath) + .sortLocalOutput("*", Order.ASCENDING) + .setParallelism(4); + + env.execute(); + + BufferedReader[] resReaders = getResultReader(resultPath); + for (BufferedReader br : resReaders) { + long cmp = Long.MIN_VALUE; + while (br.ready()) { + long cur = Long.parseLong(br.readLine()); + assertTrue("Invalid order of sorted output", cmp <= cur); + cmp = cur; + } + br.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java new file mode 100644 index 0000000..f1a3f08 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSourceITCase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.JavaProgramTestBase; + +import org.junit.Assert; + +import java.util.List; + +/** + * Tests for the DataSource. + */ +public class DataSourceITCase extends JavaProgramTestBase { + + private String inputPath; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("input", "ab\n" + + "cd\n" + + "ef\n"); + } + + @Override + protected void testProgram() throws Exception { + /* + * Test passing a configuration object to an input format + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Configuration ifConf = new Configuration(); + ifConf.setString("prepend", "test"); + + DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf); + List<String> result = ds.collect(); + + String expectedResult = "ab\n" + + "cd\n" + + "ef\n"; + + compareResultAsText(result, expectedResult); + } + + private static class TestInputFormat extends TextInputFormat { + private static final long serialVersionUID = 1L; + + public TestInputFormat(Path filePath) { + super(filePath); + } + + @Override + public void configure(Configuration parameters) { + super.configure(parameters); + + Assert.assertNotNull(parameters.getString("prepend", null)); + Assert.assertEquals("test", parameters.getString("prepend", null)); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java new file mode 100644 index 0000000..1215660 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DistinctITCase.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.operators.util.CollectionDataSets.POJO; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Integration tests for {@link DataSet#distinct}. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class DistinctITCase extends MultipleProgramsTestBase { + + public DistinctITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() throws Exception { + /* + * check correctness of distinct on tuples with key field selector + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2); + + List<Tuple3<Integer, Long, String>> result = distinctDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() throws Exception{ + /* + * check correctness of distinct on tuples with key field selector with not all fields selected + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0); + + List<Tuple1<Integer>> result = distinctDs.collect(); + + String expected = "1\n" + + "2\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnTuplesWithKeyExtractorFunction() throws Exception { + /* + * check correctness of distinct on tuples with key extractor function + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct(new KeySelector1()).project(0); + + List<Tuple1<Integer>> result = reduceDs.collect(); + + String expected = "1\n" + "2\n"; + + compareResultAsTuples(result, expected); + } + + private static class KeySelector1 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(Tuple5<Integer, Long, Integer, String, Long> in) { + return in.f0; + } + } + + @Test + public void testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor() throws Exception { + /* + * check correctness of distinct on custom type with type extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<Tuple1<Integer>> reduceDs = ds + .distinct(new KeySelector3()) + .map(new Mapper3()); + + List<Tuple1<Integer>> result = reduceDs.collect(); + + String expected = "1\n" + + "2\n" + + "3\n" + + "4\n" + + "5\n" + + "6\n"; + + compareResultAsTuples(result, expected); + } + + private static class Mapper3 extends RichMapFunction<CustomType, Tuple1<Integer>> { + @Override + public Tuple1<Integer> map(CustomType value) throws Exception { + return new Tuple1<Integer>(value.myInt); + } + } + + private static class KeySelector3 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCorrectnessOfDistinctOnTuples() throws Exception{ + /* + * check correctness of distinct on tuples + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(); + + List<Tuple3<Integer, Long, String>> result = distinctDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws Exception{ + /* + * check correctness of distinct on custom type with tuple-returning type extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> reduceDs = ds + .distinct(new KeySelector2()) + .project(0, 4); + + List<Tuple2<Integer, Long>> result = reduceDs.collect(); + + String expected = "1,1\n" + + "2,1\n" + + "2,2\n" + + "3,2\n" + + "3,3\n" + + "4,1\n" + + "4,2\n" + + "5,1\n" + + "5,2\n" + + "5,3\n"; + + compareResultAsTuples(result, expected); + } + + private static class KeySelector2 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + } + + @Test + public void testCorrectnessOfDistinctOnTuplesWithFieldExpressions() throws Exception { + /* + * check correctness of distinct on tuples with field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct("f0").project(0); + + List<Tuple1<Integer>> result = reduceDs.collect(); + + String expected = "1\n" + + "2\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnPojos() throws Exception { + /* + * check correctness of distinct on Pojos + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); + DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new Mapper2()); + + List<Integer> result = reduceDs.collect(); + + String expected = "10000\n20000\n30000\n"; + + compareResultAsText(result, expected); + } + + private static class Mapper2 implements MapFunction<CollectionDataSets.POJO, Integer> { + @Override + public Integer map(POJO value) throws Exception { + return (int) value.nestedPojo.longNumber; + } + } + + @Test + public void testDistinctOnFullPojo() throws Exception { + /* + * distinct on full Pojo + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); + DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1()); + + List<Integer> result = reduceDs.collect(); + + String expected = "10000\n20000\n30000\n"; + + compareResultAsText(result, expected); + } + + private static class Mapper1 implements MapFunction<CollectionDataSets.POJO, Integer> { + @Override + public Integer map(POJO value) throws Exception { + return (int) value.nestedPojo.longNumber; + } + } + + @Test + public void testCorrectnessOfDistinctOnAtomic() throws Exception { + /* + * check correctness of distinct on Integers + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env); + DataSet<Integer> reduceDs = ds.distinct(); + + List<Integer> result = reduceDs.collect(); + + String expected = "1\n2\n3\n4\n5"; + + compareResultAsText(result, expected); + } + + @Test + public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws Exception { + /* + * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> reduceDs = ds.union(ds).distinct("*"); + + List<String> result = reduceDs.collect(); + + String expected = "I am fine.\n" + + "Luke Skywalker\n" + + "LOL\n" + + "Hello world, how are you?\n" + + "Hi\n" + + "Hello world\n" + + "Hello\n" + + "Random comment\n"; + + compareResultAsText(result, expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java new file mode 100644 index 0000000..f7f993b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test ExecutionEnvironment from user perspective. + */ +@SuppressWarnings("serial") +public class ExecutionEnvironmentITCase extends TestLogger { + + private static final int PARALLELISM = 5; + + /** + * Ensure that the user can pass a custom configuration object to the LocalEnvironment. + */ + @Test + public void testLocalEnvironmentWithConfig() throws Exception { + Configuration conf = new Configuration(); + conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); + + final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf); + env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + env.getConfig().disableSysoutLogging(); + + DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat()) + .rebalance() + .mapPartition(new RichMapPartitionFunction<Integer, Integer>() { + @Override + public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + }); + List<Integer> resultCollection = result.collect(); + assertEquals(PARALLELISM, resultCollection.size()); + } + + private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(PARALLELISM, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java new file mode 100644 index 0000000..6fa2dce --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/FilterITCase.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.List; + +/** + * Integration tests for {@link FilterFunction} and {@link RichFilterFunction}. + */ +@RunWith(Parameterized.class) +public class FilterITCase extends MultipleProgramsTestBase { + public FilterITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllRejectingFilter() throws Exception { + /* + * Test all-rejecting filter. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new Filter1()); + + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "\n"; + + compareResultAsTuples(result, expected); + } + + private static class Filter1 implements FilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return false; + } + } + + @Test + public void testAllPassingFilter() throws Exception { + /* + * Test all-passing filter. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new Filter2()); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + + "9,4,Comment#3\n" + + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + + "21,6,Comment#15\n"; + + compareResultAsTuples(result, expected); + } + + private static class Filter2 implements FilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return true; + } + } + + @Test + public void testFilterOnStringTupleField() throws Exception { + /* + * Test filter on String tuple field. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new Filter3()); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "3,2,Hello world\n" + + + "4,3,Hello world, how are you?\n"; + + compareResultAsTuples(result, expected); + + } + + private static class Filter3 implements FilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return value.f2.contains("world"); + } + } + + @Test + public void testFilterOnIntegerTupleField() throws Exception { + /* + * Test filter on Integer tuple field. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new Filter4()); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "2,2,Hello\n" + + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + + "8,4,Comment#2\n" + + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + + "14,5,Comment#8\n" + + "16,6,Comment#10\n" + + "18,6,Comment#12\n" + + "20,6,Comment#14\n"; + + compareResultAsTuples(result, expected); + } + + private static class Filter4 implements FilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return (value.f0 % 2) == 0; + } + } + + @Test + public void testFilterBasicType() throws Exception { + /* + * Test filter on basic type + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> filterDs = ds. + filter(new Filter5()); + List<String> result = filterDs.collect(); + + String expected = "Hi\n" + + "Hello\n" + + "Hello world\n" + + "Hello world, how are you?\n"; + + compareResultAsText(result, expected); + } + + private static class Filter5 implements FilterFunction<String> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(String value) throws Exception { + return value.startsWith("H"); + } + } + + @Test + public void testFilterOnCustomType() throws Exception { + /* + * Test filter on custom type + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> filterDs = ds. + filter(new Filter6()); + List<CustomType> result = filterDs.collect(); + + String expected = "3,3,Hello world, how are you?\n" + + + "3,4,I am fine.\n" + + "3,5,Luke Skywalker\n"; + + compareResultAsText(result, expected); + } + + private static class Filter6 implements FilterFunction<CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(CustomType value) throws Exception { + return value.myString.contains("a"); + } + } + + @Test + public void testRichFilterOnStringTupleField() throws Exception { + /* + * Test filter on String tuple field. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new RichFilter1()).withBroadcastSet(ints, "ints"); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n"; + + compareResultAsTuples(result, expected); + } + + private static class RichFilter1 extends RichFilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + int literal = -1; + + @Override + public void open(Configuration config) { + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + for (int i: ints) { + literal = literal < i ? i : literal; + } + } + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return value.f0 < literal; + } + } + + @Test + public void testFilterWithBroadcastVariables() throws Exception { + /* + * Test filter with broadcast variables + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(new RichFilter2()).withBroadcastSet(intDs, "ints"); + List<Tuple3<Integer, Long, String>> result = filterDs.collect(); + + String expected = "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n"; + + compareResultAsTuples(result, expected); + } + + private static class RichFilter2 extends RichFilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private int broadcastSum = 0; + + @Override + public void open(Configuration config) { + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + for (Integer i : ints) { + broadcastSum += i; + } + } + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return (value.f1 == (broadcastSum / 11)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java new file mode 100644 index 0000000..abbd446 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/FirstNITCase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.GroupReduceOperator; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Integration tests for {@link DataSet#first}. + */ +@RunWith(Parameterized.class) +public class FirstNITCase extends MultipleProgramsTestBase { + public FirstNITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testFirstNOnUngroupedDS() throws Exception { + /* + * First-n on ungrouped data set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0); + + List<Tuple1<Integer>> result = seven.collect(); + + String expected = "(7)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testFirstNOnGroupedDS() throws Exception { + /* + * First-n on grouped data set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4) + .map(new OneMapper2()).groupBy(0).sum(1); + + List<Tuple2<Long, Integer>> result = first.collect(); + + String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testFirstNOnGroupedAndSortedDS() throws Exception { + /* + * First-n on grouped and sorted data set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3) + .project(1, 0); + + List<Tuple2<Long, Integer>> result = first.collect(); + + String expected = "(1,1)\n" + + "(2,3)\n(2,2)\n" + + "(3,6)\n(3,5)\n(3,4)\n" + + "(4,10)\n(4,9)\n(4,8)\n" + + "(5,15)\n(5,14)\n(5,13)\n" + + "(6,21)\n(6,20)\n(6,19)\n"; + + compareResultAsText(result, expected); + } + + /** + * Test for FLINK-2135. + */ + @Test + public void testFaultyCast() throws Exception { + ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> b = ee.fromElements("a", "b"); + GroupReduceOperator<String, String> a = b.groupBy(new KeySelector<String, Long>() { + @Override + public Long getKey(String value) throws Exception { + return 1L; + } + }).sortGroup(new KeySelector<String, Double>() { + @Override + public Double getKey(String value) throws Exception { + return 1.0; + } + }, Order.DESCENDING).first(1); + + List<String> result = b.collect(); + + String expected = "a\nb"; + + compareResultAsText(result, expected); + } + + private static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> { + private static final long serialVersionUID = 1L; + private final Tuple1<Integer> one = new Tuple1<Integer>(1); + @Override + public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) { + return one; + } + } + + private static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> { + private static final long serialVersionUID = 1L; + private final Tuple2<Long, Integer> one = new Tuple2<>(0L, 1); + + @Override + public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) { + one.f0 = value.f1; + return one; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java new file mode 100644 index 0000000..4b683a9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/FlatMapITCase.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.List; + +/** + * Integration tests for {@link FlatMapFunction} and {@link RichFlatMapFunction}. + */ +@RunWith(Parameterized.class) +public class FlatMapITCase extends MultipleProgramsTestBase { + public FlatMapITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testNonPassingFlatMap() throws Exception { + /* + * Test non-passing flatmap + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> nonPassingFlatMapDs = ds. + flatMap(new FlatMapper1()); + + List<String> result = nonPassingFlatMapDs.collect(); + + String expected = "\n"; + + compareResultAsText(result, expected); + } + + private static class FlatMapper1 implements FlatMapFunction<String, String> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + if (value.contains("bananas")) { + out.collect(value); + } + } + } + + @Test + public void testDataDuplicatingFlatMap() throws Exception { + /* + * Test data duplicating flatmap + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> ds = CollectionDataSets.getStringDataSet(env); + DataSet<String> duplicatingFlatMapDs = ds. + flatMap(new FlatMapper2()); + + List<String> result = duplicatingFlatMapDs.collect(); + + String expected = "Hi\n" + "HI\n" + + "Hello\n" + "HELLO\n" + + "Hello world\n" + "HELLO WORLD\n" + + "Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" + + "I am fine.\n" + "I AM FINE.\n" + + "Luke Skywalker\n" + "LUKE SKYWALKER\n" + + "Random comment\n" + "RANDOM COMMENT\n" + + "LOL\n" + "LOL\n"; + + compareResultAsText(result, expected); + } + + private static class FlatMapper2 implements FlatMapFunction<String, String> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + out.collect(value); + out.collect(value.toUpperCase()); + } + } + + @Test + public void testFlatMapWithVaryingNumberOfEmittedTuples() throws Exception { + /* + * Test flatmap with varying number of emitted tuples + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds. + flatMap(new FlatMapper3()); + + List<Tuple3<Integer, Long, String>> result = varyingTuplesMapDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + "2,2,Hello\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + "5,3,I am fine.\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + "8,4,Comment#2\n" + + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "11,5,Comment#5\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "14,5,Comment#8\n" + + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "17,6,Comment#11\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "20,6,Comment#14\n"; + + compareResultAsTuples(result, expected); + } + + private static class FlatMapper3 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple3<Integer, Long, String> value, + Collector<Tuple3<Integer, Long, String>> out) throws Exception { + final int numTuples = value.f0 % 3; + for (int i = 0; i < numTuples; i++) { + out.collect(value); + } + } + } + + @Test + public void testTypeConversionFlatMapperCustomToTuple() throws Exception { + /* + * Test type conversion flatmapper (Custom -> Tuple) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds. + flatMap(new FlatMapper4()); + + List<Tuple3<Integer, Long, String>> result = typeConversionFlatMapDs.collect(); + + String expected = "1,0,Hi\n" + + "2,1,Hello\n" + + "2,2,Hello world\n" + + "3,3,Hello world, how are you?\n" + + "3,4,I am fine.\n" + + "3,5,Luke Skywalker\n" + + "4,6,Comment#1\n" + + "4,7,Comment#2\n" + + "4,8,Comment#3\n" + + "4,9,Comment#4\n" + + "5,10,Comment#5\n" + + "5,11,Comment#6\n" + + "5,12,Comment#7\n" + + "5,13,Comment#8\n" + + "5,14,Comment#9\n" + + "6,15,Comment#10\n" + + "6,16,Comment#11\n" + + "6,17,Comment#12\n" + + "6,18,Comment#13\n" + + "6,19,Comment#14\n" + + "6,20,Comment#15\n"; + + compareResultAsTuples(result, expected); + } + + private static class FlatMapper4 implements FlatMapFunction<CustomType, Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private final Tuple3<Integer, Long, String> outTuple = + new Tuple3<Integer, Long, String>(); + + @Override + public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + outTuple.setField(value.myInt, 0); + outTuple.setField(value.myLong, 1); + outTuple.setField(value.myString, 2); + out.collect(outTuple); + } + } + + @Test + public void testTypeConversionFlatMapperTupleToBasic() throws Exception { + /* + * Test type conversion flatmapper (Tuple -> Basic) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<String> typeConversionFlatMapDs = ds. + flatMap(new FlatMapper5()); + + List<String> result = typeConversionFlatMapDs.collect(); + + String expected = "Hi\n" + "Hello\n" + "Hello world\n" + + + "Hello world, how are you?\n" + + "I am fine.\n" + "Luke Skywalker\n" + + "Comment#1\n" + "Comment#2\n" + + "Comment#3\n" + "Comment#4\n" + + "Comment#5\n" + "Comment#6\n" + + "Comment#7\n" + "Comment#8\n" + + "Comment#9\n" + "Comment#10\n" + + "Comment#11\n" + "Comment#12\n" + + "Comment#13\n" + "Comment#14\n" + + "Comment#15\n"; + + compareResultAsText(result, expected); + } + + private static class FlatMapper5 implements FlatMapFunction<Tuple3<Integer, Long, String>, String> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception { + out.collect(value.f2); + } + } + + @Test + public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws Exception { + /* + * Test flatmapper if UDF returns input object + * multiple times and changes it in between + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds. + flatMap(new FlatMapper6()); + + List<Tuple3<Integer, Long, String>> result = inputObjFlatMapDs.collect(); + + String expected = "0,1,Hi\n" + + "0,2,Hello\n" + "1,2,Hello\n" + + "0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" + + "0,3,I am fine.\n" + + "0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" + + "0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" + + "0,4,Comment#3\n" + + "0,4,Comment#4\n" + "1,4,Comment#4\n" + + "0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" + + "0,5,Comment#7\n" + + "0,5,Comment#8\n" + "1,5,Comment#8\n" + + "0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" + + "0,6,Comment#11\n" + + "0,6,Comment#12\n" + "1,6,Comment#12\n" + + "0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" + + "0,6,Comment#15\n"; + + compareResultAsTuples(result, expected); + } + + private static class FlatMapper6 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple3<Integer, Long, String> value, + Collector<Tuple3<Integer, Long, String>> out) throws Exception { + final int numTuples = value.f0 % 4; + for (int i = 0; i < numTuples; i++) { + value.setField(i, 0); + out.collect(value); + } + } + } + + @Test + public void testFlatMapWithBroadcastSet() throws Exception { + /* + * Test flatmap with broadcast set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds. + flatMap(new RichFlatMapper1()).withBroadcastSet(ints, "ints"); + List<Tuple3<Integer, Long, String>> result = bcFlatMapDs.collect(); + + String expected = "55,1,Hi\n" + + "55,2,Hello\n" + + "55,2,Hello world\n" + + "55,3,Hello world, how are you?\n" + + "55,3,I am fine.\n" + + "55,3,Luke Skywalker\n" + + "55,4,Comment#1\n" + + "55,4,Comment#2\n" + + "55,4,Comment#3\n" + + "55,4,Comment#4\n" + + "55,5,Comment#5\n" + + "55,5,Comment#6\n" + + "55,5,Comment#7\n" + + "55,5,Comment#8\n" + + "55,5,Comment#9\n" + + "55,6,Comment#10\n" + + "55,6,Comment#11\n" + + "55,6,Comment#12\n" + + "55,6,Comment#13\n" + + "55,6,Comment#14\n" + + "55,6,Comment#15\n"; + + compareResultAsTuples(result, expected); + } + + private static class RichFlatMapper1 extends RichFlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private final Tuple3<Integer, Long, String> outTuple = + new Tuple3<Integer, Long, String>(); + private Integer f2Replace = 0; + + @Override + public void open(Configuration config) { + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + int sum = 0; + for (Integer i : ints) { + sum += i; + } + f2Replace = sum; + } + + @Override + public void flatMap(Tuple3<Integer, Long, String> value, + Collector<Tuple3<Integer, Long, String>> out) throws Exception { + outTuple.setFields(f2Replace, value.f1, value.f2); + out.collect(outTuple); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java new file mode 100644 index 0000000..f0ee031 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupCombineITCase.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +/** + * The GroupCombine operator is not easy to test because it is essentially just a combiner. The result can be + * the result of a normal groupReduce at any stage its execution. The basic idea is to preserve the grouping key + * in the partial result, so that we can do a reduceGroup afterwards to finalize the results for verification. + * In addition, we can use hashPartition to partition the data and check if no shuffling (just combining) has + * been performed. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class GroupCombineITCase extends MultipleProgramsTestBase { + + public GroupCombineITCase(TestExecutionMode mode) { + super(mode); + } + + private static String identityResult = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + + "9,4,Comment#3\n" + + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + + "21,6,Comment#15\n"; + + @Test + public void testAllGroupCombineIdentity() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds + // combine + .combineGroup(new IdentityFunction()) + // fully reduce + .reduceGroup(new IdentityFunction()); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + compareResultAsTuples(result, identityResult); + } + + @Test + public void testIdentity() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds + // combine + .combineGroup(new IdentityFunction()) + // fully reduce + .reduceGroup(new IdentityFunction()); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + compareResultAsTuples(result, identityResult); + } + + @Test + public void testIdentityWithGroupBy() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds + .groupBy(1) + // combine + .combineGroup(new IdentityFunction()) + // fully reduce + .reduceGroup(new IdentityFunction()); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + compareResultAsTuples(result, identityResult); + } + + @Test + public void testIdentityWithGroupByAndSort() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds + .groupBy(1) + .sortGroup(1, Order.DESCENDING) + // reduce partially + .combineGroup(new IdentityFunction()) + .groupBy(1) + .sortGroup(1, Order.DESCENDING) + // fully reduce + .reduceGroup(new IdentityFunction()); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + compareResultAsTuples(result, identityResult); + } + + @Test + public void testPartialReduceWithIdenticalInputOutputType() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // data + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds + // wrap values as Kv pairs with the grouping key as key + .map(new Tuple3KvWrapper()); + + List<Tuple3<Integer, Long, String>> result = dsWrapped + .groupBy(0) + // reduce partially + .combineGroup(new Tuple3toTuple3GroupReduce()) + .groupBy(0) + // reduce fully to check result + .reduceGroup(new Tuple3toTuple3GroupReduce()) + //unwrap + .map(new MapFunction<Tuple2<Long, Tuple3<Integer, Long, String>>, Tuple3<Integer, Long, String>>() { + @Override + public Tuple3<Integer, Long, String> map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception { + return value.f1; + } + }).collect(); + + String expected = "1,1,combined\n" + + "5,4,combined\n" + + "15,9,combined\n" + + "34,16,combined\n" + + "65,25,combined\n" + + "111,36,combined\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testPartialReduceWithDifferentInputOutputType() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // data + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds + // wrap values as Kv pairs with the grouping key as key + .map(new Tuple3KvWrapper()); + + List<Tuple2<Integer, Long>> result = dsWrapped + .groupBy(0) + // reduce partially + .combineGroup(new Tuple3toTuple2GroupReduce()) + .groupBy(0) + // reduce fully to check result + .reduceGroup(new Tuple2toTuple2GroupReduce()) + //unwrap + .map(new MapFunction<Tuple2<Long, Tuple2<Integer, Long>>, Tuple2<Integer, Long>>() { + @Override + public Tuple2<Integer, Long> map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception { + return value.f1; + } + }).collect(); + + String expected = "1,3\n" + + "5,20\n" + + "15,58\n" + + "34,52\n" + + "65,70\n" + + "111,96\n"; + + compareResultAsTuples(result, expected); + } + + @Test + // check if no shuffle is being executed + public void testCheckPartitionShuffleGroupBy() throws Exception { + + org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // data + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + // partition and group data + UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); + + List<Tuple2<Long, Integer>> result = partitionedDS + .combineGroup( + new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + @Override + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { + int count = 0; + long key = 0; + for (Tuple3<Integer, Long, String> value : values) { + key = value.f1; + count++; + } + out.collect(new Tuple2<>(key, count)); + } + }).collect(); + + String[] localExpected = new String[] { "(6,6)", "(5,5)" + "(4,4)", "(3,3)", "(2,2)", "(1,1)" }; + + String[] resultAsStringArray = new String[result.size()]; + for (int i = 0; i < resultAsStringArray.length; ++i) { + resultAsStringArray[i] = result.get(i).toString(); + } + Arrays.sort(resultAsStringArray); + + Assert.assertEquals("The two arrays were identical.", false, Arrays.equals(localExpected, resultAsStringArray)); + } + + @Test + // check if parallelism of 1 results in the same data like a shuffle + public void testCheckPartitionShuffleDOP1() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(1); + + // data + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + + // partition and group data + UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1); + + List<Tuple2<Long, Integer>> result = partitionedDS + .combineGroup( + new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() { + @Override + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception { + int count = 0; + long key = 0; + for (Tuple3<Integer, Long, String> value : values) { + key = value.f1; + count++; + } + out.collect(new Tuple2<>(key, count)); + } + }).collect(); + + String expected = "6,6\n" + + "5,5\n" + + "4,4\n" + + "3,3\n" + + "2,2\n" + + "1,1\n"; + + compareResultAsTuples(result, expected); + } + + @Test + // check if all API methods are callable + public void testAPI() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<String>> ds = CollectionDataSets.getStringDataSet(env).map(new MapFunction<String, Tuple1<String>>() { + @Override + public Tuple1<String> map(String value) throws Exception { + return new Tuple1<>(value); + } + }); + + // all methods on DataSet + ds.combineGroup(new GroupCombineFunctionExample()) + .output(new DiscardingOutputFormat<Tuple1<String>>()); + + // all methods on UnsortedGrouping + ds.groupBy(0).combineGroup(new GroupCombineFunctionExample()) + .output(new DiscardingOutputFormat<Tuple1<String>>()); + + // all methods on SortedGrouping + ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample()) + .output(new DiscardingOutputFormat<Tuple1<String>>()); + + env.execute(); + } + + private static class GroupCombineFunctionExample implements GroupCombineFunction<Tuple1<String>, Tuple1<String>> { + + @Override + public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception { + for (Tuple1<String> value : values) { + out.collect(value); + } + } + } + + /** + * For Scala GroupCombineITCase. + */ + public static class ScalaGroupCombineFunctionExample implements GroupCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> { + + @Override + public void combine(Iterable<scala.Tuple1<String>> values, Collector<scala.Tuple1<String>> out) throws Exception { + for (scala.Tuple1<String> value : values) { + out.collect(value); + } + } + } + + private static class IdentityFunction implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, + GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + @Override + public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + for (Tuple3<Integer, Long, String> value : values) { + out.collect(new Tuple3<>(value.f0, value.f1, value.f2)); + } + } + + @Override + public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + for (Tuple3<Integer, Long, String> value : values) { + out.collect(new Tuple3<>(value.f0, value.f1, value.f2)); + } + } + } + + private static class Tuple3toTuple3GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, + Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + @Override + public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, + Tuple3<Integer, Long, String>>> out) throws Exception { + int i = 0; + long l = 0; + long key = 0; + + // collapse groups + for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) { + key = value.f0; + Tuple3<Integer, Long, String> extracted = value.f1; + i += extracted.f0; + l += extracted.f1; + } + + Tuple3<Integer, Long, String> result = new Tuple3<>(i, l, "combined"); + out.collect(new Tuple2<>(key, result)); + } + + @Override + public void reduce(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, + Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception { + combine(values, out); + } + } + + private static class Tuple3toTuple2GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, + Tuple2<Integer, Long>, Tuple2<Integer, Long>> { + + @Override + public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, + Tuple2<Integer, Long>>> out) throws Exception { + int i = 0; + long l = 0; + long key = 0; + + // collapse groups + for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) { + key = value.f0; + Tuple3<Integer, Long, String> extracted = value.f1; + i += extracted.f0; + l += extracted.f1 + extracted.f2.length(); + } + + Tuple2<Integer, Long> result = new Tuple2<>(i, l); + out.collect(new Tuple2<>(key, result)); + } + + @Override + public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, + Tuple2<Integer, Long>>> out) throws Exception { + new Tuple2toTuple2GroupReduce().reduce(values, out); + } + } + + private static class Tuple2toTuple2GroupReduce implements KvGroupReduce<Long, Tuple2<Integer, Long>, + Tuple2<Integer, Long>, Tuple2<Integer, Long>> { + + @Override + public void combine(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, + Long>>> out) throws Exception { + int i = 0; + long l = 0; + long key = 0; + + // collapse groups + for (Tuple2<Long, Tuple2<Integer, Long>> value : values) { + key = value.f0; + Tuple2<Integer, Long> extracted = value.f1; + i += extracted.f0; + l += extracted.f1; + } + + Tuple2<Integer, Long> result = new Tuple2<>(i, l); + + out.collect(new Tuple2<>(key, result)); + } + + @Override + public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, + Tuple2<Integer, Long>>> out) throws Exception { + combine(values, out); + } + } + + private class Tuple3KvWrapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, + Tuple3<Integer, Long, String>>> { + @Override + public Tuple2<Long, Tuple3<Integer, Long, String>> map(Tuple3<Integer, Long, String> value) throws Exception { + return new Tuple2<>(value.f1, value); + } + } + + private interface CombineAndReduceGroup <IN, INT, OUT> extends GroupCombineFunction<IN, INT>, + GroupReduceFunction<INT, OUT> { + } + + private interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, + Tuple2<K, OUT>> { + } + +}