http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java deleted file mode 100644 index fb3e589..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import java.util.Collection; -import java.util.List; - -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Assert; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -@RunWith(Parameterized.class) -public class MapITCase extends MultipleProgramsTestBase { - - public MapITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testIdentityMapWithBasicType() throws Exception { - /* - * Test identity map with basic type - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<String> ds = CollectionDataSets.getStringDataSet(env); - DataSet<String> identityMapDs = ds. - map(new Mapper1()); - - List<String> result = identityMapDs.collect(); - - String expected = "Hi\n" + - "Hello\n" + - "Hello world\n" + - "Hello world, how are you?\n" + - "I am fine.\n" + - "Luke Skywalker\n" + - "Random comment\n" + - "LOL\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testRuntimeContextAndExecutionConfigParams() throws Exception { - /* - * Test identity map with basic type - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().setNumberOfExecutionRetries(1000); - env.getConfig().setTaskCancellationInterval(50000); - - DataSet<String> ds = CollectionDataSets.getStringDataSet(env); - DataSet<String> identityMapDs = ds. - map(new RichMapFunction<String, String>() { - @Override - public String map(String value) throws Exception { - Assert.assertTrue(1000 == getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries()); - Assert.assertTrue(50000 == getRuntimeContext().getExecutionConfig().getTaskCancellationInterval()); - return value; - } - }); - - List<String> result = identityMapDs.collect(); - - String expected = "Hi\n" + - "Hello\n" + - "Hello world\n" + - "Hello world, how are you?\n" + - "I am fine.\n" + - "Luke Skywalker\n" + - "Random comment\n" + - "LOL\n"; - - compareResultAsText(result, expected); - } - - public static class Mapper1 implements MapFunction<String, String> { - private static final long serialVersionUID = 1L; - - @Override - public String map(String value) throws Exception { - return value; - } - } - - @Test - public void testIdentityMapWithTuple() throws Exception { - /* - * Test identity map with a tuple - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds. - map(new Mapper2()); - - List<Tuple3<Integer, Long, String>> result = identityMapDs.collect(); - - String expected = "1,1,Hi\n" + - "2,2,Hello\n" + - "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + - "5,3,I am fine.\n" + - "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + - "8,4,Comment#2\n" + - "9,4,Comment#3\n" + - "10,4,Comment#4\n" + - "11,5,Comment#5\n" + - "12,5,Comment#6\n" + - "13,5,Comment#7\n" + - "14,5,Comment#8\n" + - "15,5,Comment#9\n" + - "16,6,Comment#10\n" + - "17,6,Comment#11\n" + - "18,6,Comment#12\n" + - "19,6,Comment#13\n" + - "20,6,Comment#14\n" + - "21,6,Comment#15\n"; - - compareResultAsTuples(result, expected); - } - - public static class Mapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) - throws Exception { - return value; - } - } - - @Test - public void testTypeConversionMapperCustomToTuple() throws Exception { - /* - * Test type conversion mapper (Custom -> Tuple) - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds. - map(new Mapper3()); - - List<Tuple3<Integer, Long, String>> result = typeConversionMapDs.collect(); - - String expected = "1,0,Hi\n" + - "2,1,Hello\n" + - "2,2,Hello world\n" + - "3,3,Hello world, how are you?\n" + - "3,4,I am fine.\n" + - "3,5,Luke Skywalker\n" + - "4,6,Comment#1\n" + - "4,7,Comment#2\n" + - "4,8,Comment#3\n" + - "4,9,Comment#4\n" + - "5,10,Comment#5\n" + - "5,11,Comment#6\n" + - "5,12,Comment#7\n" + - "5,13,Comment#8\n" + - "5,14,Comment#9\n" + - "6,15,Comment#10\n" + - "6,16,Comment#11\n" + - "6,17,Comment#12\n" + - "6,18,Comment#13\n" + - "6,19,Comment#14\n" + - "6,20,Comment#15\n"; - - compareResultAsTuples(result, expected); - } - - public static class Mapper3 implements MapFunction<CustomType, Tuple3<Integer, Long, String>> { - private static final long serialVersionUID = 1L; - private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); - - @Override - public Tuple3<Integer, Long, String> map(CustomType value) throws Exception { - out.setField(value.myInt, 0); - out.setField(value.myLong, 1); - out.setField(value.myString, 2); - return out; - } - } - - @Test - public void testTypeConversionMapperTupleToBasic() throws Exception { - /* - * Test type conversion mapper (Tuple -> Basic) - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<String> typeConversionMapDs = ds. - map(new Mapper4()); - - List<String> result = typeConversionMapDs.collect(); - - String expected = "Hi\n" + "Hello\n" + "Hello world\n" + - "Hello world, how are you?\n" + - "I am fine.\n" + "Luke Skywalker\n" + - "Comment#1\n" + "Comment#2\n" + - "Comment#3\n" + "Comment#4\n" + - "Comment#5\n" + "Comment#6\n" + - "Comment#7\n" + "Comment#8\n" + - "Comment#9\n" + "Comment#10\n" + - "Comment#11\n" + "Comment#12\n" + - "Comment#13\n" + "Comment#14\n" + - "Comment#15\n"; - - compareResultAsText(result, expected); - } - - public static class Mapper4 implements MapFunction<Tuple3<Integer, Long, String>, String> { - private static final long serialVersionUID = 1L; - - @Override - public String map(Tuple3<Integer, Long, String> value) throws Exception { - return value.getField(2); - } - } - - @Test - public void testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws - Exception { - /* - * Test mapper on tuple - Increment Integer field, reorder second and third fields - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds. - map(new Mapper5()); - - List<Tuple3<Integer, String, Long>> result = tupleMapDs.collect(); - - String expected = "2,Hi,1\n" + - "3,Hello,2\n" + - "4,Hello world,2\n" + - "5,Hello world, how are you?,3\n" + - "6,I am fine.,3\n" + - "7,Luke Skywalker,3\n" + - "8,Comment#1,4\n" + - "9,Comment#2,4\n" + - "10,Comment#3,4\n" + - "11,Comment#4,4\n" + - "12,Comment#5,5\n" + - "13,Comment#6,5\n" + - "14,Comment#7,5\n" + - "15,Comment#8,5\n" + - "16,Comment#9,5\n" + - "17,Comment#10,6\n" + - "18,Comment#11,6\n" + - "19,Comment#12,6\n" + - "20,Comment#13,6\n" + - "21,Comment#14,6\n" + - "22,Comment#15,6\n"; - - compareResultAsTuples(result, expected); - } - - public static class Mapper5 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>> { - private static final long serialVersionUID = 1L; - private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>(); - - @Override - public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value) - throws Exception { - Integer incr = Integer.valueOf(value.f0.intValue() + 1); - out.setFields(incr, value.f2, value.f1); - return out; - } - } - - @Test - public void testMapperOnCustomLowercaseString() throws Exception { - /* - * Test mapper on Custom - lowercase myString - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> customMapDs = ds. - map(new Mapper6()); - - List<CustomType> result = customMapDs.collect(); - - String expected = "1,0,hi\n" + - "2,1,hello\n" + - "2,2,hello world\n" + - "3,3,hello world, how are you?\n" + - "3,4,i am fine.\n" + - "3,5,luke skywalker\n" + - "4,6,comment#1\n" + - "4,7,comment#2\n" + - "4,8,comment#3\n" + - "4,9,comment#4\n" + - "5,10,comment#5\n" + - "5,11,comment#6\n" + - "5,12,comment#7\n" + - "5,13,comment#8\n" + - "5,14,comment#9\n" + - "6,15,comment#10\n" + - "6,16,comment#11\n" + - "6,17,comment#12\n" + - "6,18,comment#13\n" + - "6,19,comment#14\n" + - "6,20,comment#15\n"; - - compareResultAsText(result, expected); - } - - public static class Mapper6 implements MapFunction<CustomType, CustomType> { - private static final long serialVersionUID = 1L; - private final CustomType out = new CustomType(); - - @Override - public CustomType map(CustomType value) throws Exception { - out.myInt = value.myInt; - out.myLong = value.myLong; - out.myString = value.myString.toLowerCase(); - return out; - } - } - - @Test - public void test() throws Exception { - /* - * Test mapper if UDF returns input object - increment first field of a tuple - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds. - map(new Mapper7()); - - List<Tuple3<Integer, Long, String>> result = inputObjMapDs.collect(); - - String expected = "2,1,Hi\n" + - "3,2,Hello\n" + - "4,2,Hello world\n" + - "5,3,Hello world, how are you?\n" + - "6,3,I am fine.\n" + - "7,3,Luke Skywalker\n" + - "8,4,Comment#1\n" + - "9,4,Comment#2\n" + - "10,4,Comment#3\n" + - "11,4,Comment#4\n" + - "12,5,Comment#5\n" + - "13,5,Comment#6\n" + - "14,5,Comment#7\n" + - "15,5,Comment#8\n" + - "16,5,Comment#9\n" + - "17,6,Comment#10\n" + - "18,6,Comment#11\n" + - "19,6,Comment#12\n" + - "20,6,Comment#13\n" + - "21,6,Comment#14\n" + - "22,6,Comment#15\n"; - - compareResultAsTuples(result, expected); - } - - public static class Mapper7 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) - throws Exception { - Integer incr = Integer.valueOf(value.f0.intValue() + 1); - value.setField(incr, 0); - return value; - } - } - - @Test - public void testMapWithBroadcastSet() throws Exception { - /* - * Test map with broadcast set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds. - map(new RichMapper1()).withBroadcastSet(ints, "ints"); - List<Tuple3<Integer, Long, String>> result = bcMapDs.collect(); - - String expected = "55,1,Hi\n" + - "55,2,Hello\n" + - "55,2,Hello world\n" + - "55,3,Hello world, how are you?\n" + - "55,3,I am fine.\n" + - "55,3,Luke Skywalker\n" + - "55,4,Comment#1\n" + - "55,4,Comment#2\n" + - "55,4,Comment#3\n" + - "55,4,Comment#4\n" + - "55,5,Comment#5\n" + - "55,5,Comment#6\n" + - "55,5,Comment#7\n" + - "55,5,Comment#8\n" + - "55,5,Comment#9\n" + - "55,6,Comment#10\n" + - "55,6,Comment#11\n" + - "55,6,Comment#12\n" + - "55,6,Comment#13\n" + - "55,6,Comment#14\n" + - "55,6,Comment#15\n"; - - compareResultAsTuples(result, expected); - } - - public static class RichMapper1 extends RichMapFunction<Tuple3<Integer,Long,String>, - Tuple3<Integer, Long,String>> { - private static final long serialVersionUID = 1L; - private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); - private Integer f2Replace = 0; - - @Override - public void open(Configuration config) { - Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); - int sum = 0; - for(Integer i : ints) { - sum += i; - } - f2Replace = sum; - } - - @Override - public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) - throws Exception { - out.setFields(f2Replace, value.f1, value.f2); - return out; - } - } - - static final String testKey = "testVariable"; - static final int testValue = 666; - - @Test - public void testPassingConfigurationObject() throws Exception { - /* - * Test passing configuration object. - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); - Configuration conf = new Configuration(); - conf.setInteger(testKey, testValue); - DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds. - map(new RichMapper2()).withParameters(conf); - List<Tuple3<Integer, Long, String>> result = bcMapDs.collect(); - - String expected = "1,1,Hi\n" - + "2,2,Hello\n" - + "3,2,Hello world"; - - compareResultAsTuples(result, expected); - } - - public static class RichMapper2 extends RichMapFunction<Tuple3<Integer,Long,String>, - Tuple3<Integer, Long,String>> { - private static final long serialVersionUID = 1L; - - @Override - public void open(Configuration config) { - int val = config.getInteger(testKey, -1); - Assert.assertEquals(testValue, val); - } - - @Override - public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) { - return value; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java deleted file mode 100644 index cc895c2..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.functions.MapPartitionFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.util.Collector; - -@SuppressWarnings("serial") -public class MapPartitionITCase extends JavaProgramTestBase { - - private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" - + "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" - + "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n"; - - private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n"; - - - private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>(); - - private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>(); - - private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>(); - - - @Override - protected void preSubmit() throws Exception { - - // create input - for (String s :IN.split("\n")) { - String[] fields = s.split(" "); - input.add(new Tuple2<String, String>(fields[0], fields[1])); - } - - // create expected - for (String s : RESULT.split("\n")) { - String[] fields = s.split(" "); - expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1]))); - } - - } - - @Override - protected void postSubmit() { - compareResultCollections(expected, result, new TestBaseUtils.TupleComparator<Tuple2<String, Integer>>()); - } - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<String, String>> data = env.fromCollection(input); - - data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result)); - - env.execute(); - } - - - public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> { - - @Override - public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<Tuple2<String, Integer>> out) { - for (Tuple2<String, String> value : values) { - String keyString = value.f0; - String valueString = value.f1; - - int keyInt = Integer.parseInt(keyString); - int valueInt = Integer.parseInt(valueString); - - if (keyInt + valueInt < 10) { - out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10)); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java deleted file mode 100644 index 63abe63..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.*; - -/** - * These check whether the object-reuse execution mode does really reuse objects. - */ -@SuppressWarnings("serial" ) -@RunWith(Parameterized.class) -public class ObjectReuseITCase extends MultipleProgramsTestBase { - - private static final List<Tuple2<String, Integer>> REDUCE_DATA = - Arrays.asList( - new Tuple2<>("a", 1), new Tuple2<>("a", 2), - new Tuple2<>("a", 3), new Tuple2<>("a", 4), - new Tuple2<>("a", 50)); - - private static final List<Tuple2<String, Integer>> GROUP_REDUCE_DATA = - Arrays.asList( - new Tuple2<>("a", 1), new Tuple2<>("a", 2), - new Tuple2<>("a", 3), new Tuple2<>("a", 4), - new Tuple2<>("a", 5)); - - - private final boolean objectReuse; - - public ObjectReuseITCase(boolean objectReuse) { - super(TestExecutionMode.CLUSTER); - this.objectReuse = objectReuse; - } - - @Test - public void testKeyedReduce() throws Exception { - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (objectReuse) { - env.getConfig().enableObjectReuse(); - } else { - env.getConfig().disableObjectReuse(); - } - - DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA); - - DataSet<Tuple2<String, Integer>> result = input - .groupBy(0) - .reduce(new ReduceFunction<Tuple2<String, Integer>>() { - - @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { - value2.f1 += value1.f1; - return value2; - } - }); - - Tuple2<String, Integer> res = result.collect().get(0); - assertEquals(new Tuple2<>("a", 60), res); - } - - @Test - public void testGlobalReduce() throws Exception { - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (objectReuse) { - env.getConfig().enableObjectReuse(); - } else { - env.getConfig().disableObjectReuse(); - } - - DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA); - - DataSet<Tuple2<String, Integer>> result = input.reduce( - new ReduceFunction<Tuple2<String, Integer>>() { - - @Override - public Tuple2<String, Integer> reduce( - Tuple2<String, Integer> value1, - Tuple2<String, Integer> value2) { - - if (value1.f1 % 3 == 0) { - value1.f1 += value2.f1; - return value1; - } else { - value2.f1 += value1.f1; - return value2; - } - } - - }); - - Tuple2<String, Integer> res = result.collect().get(0); - assertEquals(new Tuple2<>("a", 60), res); - } - - @Test - public void testKeyedGroupReduce() throws Exception { - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (objectReuse) { - env.getConfig().enableObjectReuse(); - } else { - env.getConfig().disableObjectReuse(); - } - - DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA); - - DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduceGroup( - new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { - - @Override - public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) { - List<Tuple2<String, Integer>> list = new ArrayList<>(); - for (Tuple2<String, Integer> val : values) { - list.add(val); - } - - for (Tuple2<String, Integer> val : list) { - out.collect(val); - } - } - }); - - List<Tuple2<String, Integer>> is = result.collect(); - Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>()); - - List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled() ? - Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4), - new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) : - Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2), - new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5)); - - assertEquals(expected, is); - } - - @Test - public void testGlobalGroupReduce() throws Exception { - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if (objectReuse) { - env.getConfig().enableObjectReuse(); - } else { - env.getConfig().disableObjectReuse(); - } - - DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA); - - DataSet<Tuple2<String, Integer>> result = input.reduceGroup( - new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { - - @Override - public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) { - List<Tuple2<String, Integer>> list = new ArrayList<>(); - for (Tuple2<String, Integer> val : values) { - list.add(val); - } - - for (Tuple2<String, Integer> val : list) { - out.collect(val); - } - } - }); - - List<Tuple2<String, Integer>> is = result.collect(); - Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>()); - - List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled() ? - Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4), - new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) : - Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2), - new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5)); - - assertEquals(expected, is); - } - - @Parameterized.Parameters(name = "Execution mode = CLUSTER, Reuse = {0}") - public static Collection<Object[]> executionModes() { - return Arrays.asList( - new Object[] { false, }, - new Object[] { true } ); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java deleted file mode 100644 index 5215a36..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java +++ /dev/null @@ -1,680 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.RichFlatJoinFunction; -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Collection; -import java.util.List; - -@SuppressWarnings("serial") -@RunWith(Parameterized.class) -public class OuterJoinITCase extends MultipleProgramsTestBase { - - public OuterJoinITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testLeftOuterJoin1() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE); - } - - @Test - public void testLeftOuterJoin2() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST); - } - - @Test - public void testLeftOuterJoin3() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); - } - - @Test - public void testLeftOuterJoin4() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); - } - - @Test (expected = InvalidProgramException.class) - public void testLeftOuterJoin5() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); - } - - private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { - /* - * UDF Join on tuples with key field positions - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.leftOuterJoin(ds2, hint) - .where(0) - .equalTo(0) - .with(new T3T5FlatJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "Hi,Hallo\n" + - "Hello,Hallo Welt\n" + - "Hello,Hallo Welt wie\n" + - "Hello world,null\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testRightOuterJoin1() throws Exception { - testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE); - } - - @Test - public void testRightOuterJoin2() throws Exception { - testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST); - } - - @Test - public void testRightOuterJoin3() throws Exception { - testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); - } - - @Test - public void testRightOuterJoin4() throws Exception { - testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); - } - - @Test (expected = InvalidProgramException.class) - public void testRightOuterJoin5() throws Exception { - testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); - } - - private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { - /* - * UDF Join on tuples with key field positions - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.rightOuterJoin(ds2, hint) - .where(1) - .equalTo(1) - .with(new T3T5FlatJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "Hi,Hallo\n" + - "Hello,Hallo Welt\n" + - "null,Hallo Welt wie\n" + - "Hello world,Hallo Welt\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testFullOuterJoin1() throws Exception { - testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE); - } - - @Test - public void testFullOuterJoin2() throws Exception { - testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST); - } - - @Test - public void testFullOuterJoin3() throws Exception { - testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); - } - - @Test (expected = InvalidProgramException.class) - public void testFullOuterJoin4() throws Exception { - testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); - } - - @Test (expected = InvalidProgramException.class) - public void testFullOuterJoin5() throws Exception { - testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); - } - - private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { - /* - * UDF Join on tuples with key field positions - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.fullOuterJoin(ds2, hint) - .where(0) - .equalTo(2) - .with(new T3T5FlatJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "null,Hallo\n" + - "Hi,Hallo Welt\n" + - "Hello,Hallo Welt wie\n" + - "Hello world,null\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinOnTuplesWithCompositeKeyPositions() throws Exception { - /* - * UDF Join on tuples with multiple key field positions - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.fullOuterJoin(ds2) - .where(0, 1) - .equalTo(0, 4) - .with(new T3T5FlatJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "Hi,Hallo\n" + - "Hello,Hallo Welt\n" + - "Hello world,null\n" + - "null,Hallo Welt wie\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithBroadcastSet() throws Exception { - /* - * Join with broadcast set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple3<String, String, Integer>> joinDs = - ds1.fullOuterJoin(ds2) - .where(1) - .equalTo(4) - .with(new T3T5BCJoin()) - .withBroadcastSet(intDs, "ints"); - - List<Tuple3<String, String, Integer>> result = joinDs.collect(); - - String expected = "Hi,Hallo,55\n" + - "Hi,Hallo Welt wie,55\n" + - "Hello,Hallo Welt,55\n" + - "Hello world,Hallo Welt,55\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithMixedKeyTypes1() throws Exception { - /* - * Join on a tuple input with key field selector and a custom type input with key extractor - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.fullOuterJoin(ds2) - .where(new KeySelector1()) - .equalTo(0) - .with(new CustT3Join()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "Hi,Hi\n" + - "Hello,Hello\n" + - "Hello world,Hello\n" + - "null,Hello world\n"; - - compareResultAsTuples(result, expected); - - } - - public static class KeySelector1 implements KeySelector<CustomType, Integer> { - @Override - public Integer getKey(CustomType value) { - return value.myInt; - } - } - - - @Test - public void testJoinWithMixedKeyTypes2() - throws Exception { - /* - * Join on a tuple input with key field selector and a custom type input with key extractor - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.fullOuterJoin(ds2) - .where(1) - .equalTo(new KeySelector2()) - .with(new T3CustJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "null,Hi\n" + - "Hi,Hello\n" + - "Hello,Hello world\n" + - "Hello world,Hello world\n"; - - compareResultAsTuples(result, expected); - } - - public static class KeySelector2 implements KeySelector<CustomType, Long> { - @Override - public Long getKey(CustomType value) { - return value.myLong; - } - } - - @Test - public void testJoinWithTupleReturningKeySelectors() throws Exception { - /* - * UDF Join on tuples with tuple-returning key selectors - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.fullOuterJoin(ds2) - .where(new KeySelector3()) //0, 1 - .equalTo(new KeySelector4()) // 0, 4 - .with(new T3T5FlatJoin()); - - List<Tuple2<String, String>> result = joinDs.collect(); - - String expected = "Hi,Hallo\n" + - "Hello,Hallo Welt\n" + - "Hello world,null\n" + - "null,Hallo Welt wie\n"; - - compareResultAsTuples(result, expected); - } - - public static class KeySelector3 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) { - return new Tuple2<>(t.f0, t.f1); - } - } - - public static class KeySelector4 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) { - return new Tuple2<>(t.f0, t.f4); - } - } - - @Test - public void testJoinWithNestedKeyExpression1() throws Exception { - /* - * Join nested pojo against tuple (selected using a string) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("nestedPojo.longNumber") - .equalTo("f6") - .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>()); - - List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithNestedKeyExpression2() throws Exception { - /* - * Join nested pojo against tuple (selected as an integer) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("nestedPojo.longNumber") - .equalTo(6) // <--- difference! - .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>()); - - List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithCompositeKeyExpressions() throws Exception { - /* - * selecting multiple fields using expression language - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("nestedPojo.longNumber", "number", "str") - .equalTo("f6", "f0", "f1") - .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>()); - - env.setParallelism(1); - List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testNestedIntoTuple() throws Exception { - /* - * nested into tuple - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("nestedPojo.longNumber", "number", "nestedTupleWithCustom.f0") - .equalTo("f6", "f0", "f2") - .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>()); - - env.setParallelism(1); - List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testNestedIntoTupleIntoPojo() throws Exception { - /* - * nested into tuple into pojo - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); - DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", "nestedTupleWithCustom.f1.myLong") - .equalTo("f2", "f3", "f4") - .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>()); - - env.setParallelism(1); - List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + - "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + - "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testNonPojoToVerifyFullTupleKeys() throws Exception { - /* - * Non-POJO test to verify that full-tuple keys are working. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> joinDs = - ds1.fullOuterJoin(ds2) - .where(0) - .equalTo("f0.f0", "f0.f1") // key is now Tuple2<Integer, Integer> - .with(new ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>()); - - env.setParallelism(1); - List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect(); - - String expected = "((1,1),one),((1,1),one)\n" + - "((2,2),two),((2,2),two)\n" + - "((3,3),three),((3,3),three)\n"; - - compareResultAsTuples(result, expected); - - } - - @Test - public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception { - /* - * Non-POJO test to verify "nested" tuple-element selection. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); - DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("f0.f0") - .equalTo("f0.f0") // key is now Integer from Tuple2<Integer, Integer> - .with(new ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>()); - - env.setParallelism(1); - List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect(); - - String expected = "((1,1),one),((1,1),one)\n" + - "((2,2),two),((2,2),two)\n" + - "((3,3),three),((3,3),three)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testFullPojoWithFullTuple() throws Exception { - /* - * full pojo with full tuple - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); - DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env); - DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>> joinDs = - ds1.fullOuterJoin(ds2) - .where("*") - .equalTo("*") - .with(new ProjectBothFunction<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>()); - - env.setParallelism(1); - List<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>> result = joinDs.collect(); - - String expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" + - "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" + - "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithAtomicType1() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet<Integer> ds2 = env.fromElements(1, 2); - - DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs = ds1 - .fullOuterJoin(ds2) - .where(0) - .equalTo("*") - .with(new ProjectBothFunction<Tuple3<Integer, Long, String>, Integer>()) - .returns("Tuple2<java.lang.Object,java.lang.Object>"); - - List<Tuple2<Tuple3<Integer, Long, String>, Integer>> result = joinDs.collect(); - - String expected = "(1,1,Hi),1\n" + - "(2,2,Hello),2\n" + - "(3,2,Hello world),null\n"; - - compareResultAsTuples(result, expected); - } - - @Test - public void testJoinWithAtomicType2() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> ds1 = env.fromElements(1, 2); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - - DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs = ds1 - .fullOuterJoin(ds2) - .where("*") - .equalTo(0) - .with(new ProjectBothFunction<Integer, Tuple3<Integer, Long, String>>()) - .returns("Tuple2<java.lang.Object,java.lang.Object>"); - - - List<Tuple2<Integer, Tuple3<Integer, Long, String>>> result = joinDs.collect(); - - String expected = "1,(1,1,Hi)\n" + - "2,(2,2,Hello)\n" + - "null,(3,2,Hello world)\n"; - - compareResultAsTuples(result, expected); - } - - public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> { - - @Override - public void join(Tuple3<Integer, Long, String> first, - Tuple5<Integer, Long, Integer, String, Long> second, - Collector<Tuple2<String, String>> out) { - - out.collect(new Tuple2<>(first == null ? null : first.f2, second == null ? null : second.f3)); - } - - } - - public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> { - - private int broadcast; - - @Override - public void open(Configuration config) { - Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); - int sum = 0; - for (Integer i : ints) { - sum += i; - } - broadcast = sum; - } - - @Override - public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, - Collector<Tuple3<String, String, Integer>> out) throws Exception { - out.collect(new Tuple3<>(first == null ? null : first.f2, second == null ? null : second.f3, broadcast)); - } - } - - public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> { - - @Override - public Tuple2<String, String> join(Tuple3<Integer, Long, String> first, - CustomType second) { - - return new Tuple2<>(first == null ? null : first.f2, second == null ? null : second.myString); - } - } - - public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> { - - @Override - public Tuple2<String, String> join(CustomType first, Tuple3<Integer, Long, String> second) { - - return new Tuple2<>(first == null ? null : first.myString, second == null ? null : second.f2); - } - } - - /** - * Deliberately untyped join function, which emits a Tuple2 of the left and right side. - */ - public static class ProjectBothFunction<IN1, IN2> implements JoinFunction<IN1, IN2, Tuple2<IN1, IN2>> { - @Override - public Tuple2<IN1, IN2> join(IN1 first, IN2 second) throws Exception { - return new Tuple2<>(first, second); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java deleted file mode 100644 index 85d70e3..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java +++ /dev/null @@ -1,848 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import java.io.Serializable; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.MapPartitionFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.AggregateOperator; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -@SuppressWarnings("serial") -public class PartitionITCase extends MultipleProgramsTestBase { - - public PartitionITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testHashPartitionByKeyField() throws Exception { - /* - * Test hash partition by key field - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByHash(1) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testRangePartitionByKeyField() throws Exception { - /* - * Test range partition by key field - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByRange(1) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testHashPartitionByKeyField2() throws Exception { - /* - * Test hash partition by key field - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - AggregateOperator<Tuple3<Integer, Long, String>> sum = ds - .map(new PrefixMapper()) - .partitionByHash(1, 2) - .groupBy(1, 2) - .sum(0); - - List<Tuple3<Integer, Long, String>> result = sum.collect(); - - String expected = "(1,1,Hi)\n" + - "(5,2,Hello)\n" + - "(4,3,Hello)\n" + - "(5,3,I am )\n" + - "(6,3,Luke )\n" + - "(34,4,Comme)\n" + - "(65,5,Comme)\n" + - "(111,6,Comme)"; - - compareResultAsText(result, expected); - } - - @Test - public void testRangePartitionByKeyField2() throws Exception { - /* - * Test range partition by key field - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - AggregateOperator<Tuple3<Integer, Long, String>> sum = ds - .map(new PrefixMapper()) - .partitionByRange(1, 2) - .groupBy(1, 2) - .sum(0); - - List<Tuple3<Integer, Long, String>> result = sum.collect(); - - String expected = "(1,1,Hi)\n" + - "(5,2,Hello)\n" + - "(4,3,Hello)\n" + - "(5,3,I am )\n" + - "(6,3,Luke )\n" + - "(34,4,Comme)\n" + - "(65,5,Comme)\n" + - "(111,6,Comme)"; - - compareResultAsText(result, expected); - } - - @Test - public void testHashPartitionOfAtomicType() throws Exception { - /* - * Test hash partition of atomic type - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Long> uniqLongs = env.generateSequence(1, 6) - .union(env.generateSequence(1, 6)) - .rebalance() - .partitionByHash("*") - .mapPartition(new UniqueLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testRangePartitionOfAtomicType() throws Exception { - /* - * Test range partition of atomic type - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Long> uniqLongs = env.generateSequence(1, 6) - .union(env.generateSequence(1, 6)) - .rebalance() - .partitionByRange("*") - .mapPartition(new UniqueLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testHashPartitionByKeySelector() throws Exception { - /* - * Test hash partition by key selector - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByHash(new KeySelector1()) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - private static class PrefixMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { - @Override - public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) throws Exception { - if (value.f2.length() > 5) { - value.f2 = value.f2.substring(0, 5); - } - return value; - } - } - - @Test - public void testRangePartitionByKeySelector() throws Exception { - /* - * Test range partition by key selector - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByRange(new KeySelector1()) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> { - private static final long serialVersionUID = 1L; - - @Override - public Long getKey(Tuple3<Integer, Long, String> value) throws Exception { - return value.f1; - } - - } - - @Test - public void testForcedRebalancing() throws Exception { - /* - * Test forced rebalancing - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // generate some number in parallel - DataSet<Long> ds = env.generateSequence(1,3000); - DataSet<Tuple2<Integer, Integer>> uniqLongs = ds - // introduce some partition skew by filtering - .filter(new Filter1()) - // rebalance - .rebalance() - // count values in each partition - .map(new PartitionIndexMapper()) - .groupBy(0) - .reduce(new Reducer1()) - // round counts to mitigate runtime scheduling effects (lazy split assignment) - .map(new Mapper1()); - - List<Tuple2<Integer, Integer>> result = uniqLongs.collect(); - - StringBuilder expected = new StringBuilder(); - int numPerPartition = 2220 / env.getParallelism() / 10; - for (int i = 0; i < env.getParallelism(); i++) { - expected.append('(').append(i).append(',') - .append(numPerPartition).append(")\n"); - } - - compareResultAsText(result, expected.toString()); - } - - public static class Filter1 implements FilterFunction<Long> { - private static final long serialVersionUID = 1L; - - @Override - public boolean filter(Long value) throws Exception { - return value > 780; - } - } - - public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) { - return new Tuple2<>(v1.f0, v1.f1+v2.f1); - } - } - - public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, - Integer>>{ - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { - value.f1 = (value.f1 / 10); - return value; - } - - } - - @Test - public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception { - /* - * Test hash partition by key field and different parallelism - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByHash(1).setParallelism(4) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testRangePartitionByKeyFieldAndDifferentParallelism() throws Exception { - /* - * Test range partition by key field and different parallelism - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByRange(1).setParallelism(4) - .mapPartition(new UniqueTupleLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "1\n" + - "2\n" + - "3\n" + - "4\n" + - "5\n" + - "6\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testHashPartitionWithKeyExpression() throws Exception { - /* - * Test hash partition with key expression - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - - DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByHash("nestedPojo.longNumber").setParallelism(4) - .mapPartition(new UniqueNestedPojoLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "10000\n" + - "20000\n" + - "30000\n"; - - compareResultAsText(result, expected); - } - - @Test - public void testRangePartitionWithKeyExpression() throws Exception { - /* - * Test range partition with key expression - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - - DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env); - DataSet<Long> uniqLongs = ds - .partitionByRange("nestedPojo.longNumber").setParallelism(4) - .mapPartition(new UniqueNestedPojoLongMapper()); - List<Long> result = uniqLongs.collect(); - - String expected = "10000\n" + - "20000\n" + - "30000\n"; - - compareResultAsText(result, expected); - } - - public static class UniqueTupleLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> { - private static final long serialVersionUID = 1L; - - @Override - public void mapPartition(Iterable<Tuple3<Integer, Long, String>> records, Collector<Long> out) throws Exception { - HashSet<Long> uniq = new HashSet<>(); - for(Tuple3<Integer,Long,String> t : records) { - uniq.add(t.f1); - } - for(Long l : uniq) { - out.collect(l); - } - } - } - - public static class UniqueLongMapper implements MapPartitionFunction<Long, Long> { - private static final long serialVersionUID = 1L; - - @Override - public void mapPartition(Iterable<Long> longs, Collector<Long> out) throws Exception { - HashSet<Long> uniq = new HashSet<>(); - for(Long l : longs) { - uniq.add(l); - } - for(Long l : uniq) { - out.collect(l); - } - } - } - - public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<POJO, Long> { - private static final long serialVersionUID = 1L; - - @Override - public void mapPartition(Iterable<POJO> records, Collector<Long> out) throws Exception { - HashSet<Long> uniq = new HashSet<>(); - for(POJO t : records) { - uniq.add(t.nestedPojo.longNumber); - } - for(Long l : uniq) { - out.collect(l); - } - } - } - - public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Integer> map(Long value) throws Exception { - return new Tuple2<>(this.getRuntimeContext().getIndexOfThisSubtask(), 1); - } - } - - @Test - public void testRangePartitionerOnSequenceData() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSource<Long> dataSource = env.generateSequence(0, 10000); - KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector(); - - MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new MinMaxSelector<>(new LongComparator(true)); - - Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator(new LongComparator(true)); - - List<Tuple2<Long, Long>> collected = dataSource.partitionByRange(keyExtractor).mapPartition(MinMaxSelector).collect(); - Collections.sort(collected, tuple2Comparator); - - long previousMax = -1; - for (Tuple2<Long, Long> tuple2 : collected) { - if (previousMax == -1) { - previousMax = tuple2.f1; - } else { - long currentMin = tuple2.f0; - assertTrue(tuple2.f0 < tuple2.f1); - assertEquals(previousMax + 1, currentMin); - previousMax = tuple2.f1; - } - } - } - - @Test(expected = InvalidProgramException.class) - public void testRangePartitionInIteration() throws Exception { - - // does not apply for collection execution - if (super.mode == TestExecutionMode.COLLECTION) { - throw new InvalidProgramException("Does not apply for collection execution"); - } - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSource<Long> source = env.generateSequence(0, 10000); - - DataSet<Tuple2<Long, String>> tuples = source.map(new MapFunction<Long, Tuple2<Long, String>>() { - @Override - public Tuple2<Long, String> map(Long v) throws Exception { - return new Tuple2<>(v, Long.toString(v)); - } - }); - - DeltaIteration<Tuple2<Long, String>, Tuple2<Long, String>> it = tuples.iterateDelta(tuples, 10, 0); - DataSet<Tuple2<Long, String>> body = it.getWorkset() - .partitionByRange(1) // Verify that range partition is not allowed in iteration - .join(it.getSolutionSet()) - .where(0).equalTo(0).projectFirst(0).projectSecond(1); - DataSet<Tuple2<Long, String>> result = it.closeWith(body, body); - - result.collect(); // should fail - } - - - - @Test - public void testRangePartitionerOnSequenceDataWithOrders() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000) - .map(new MapFunction<Long, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(Long value) throws Exception { - return new Tuple2<>(value / 5000, value % 5000); - } - }); - - final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), - new LongComparator(false)); - - MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); - - final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1) - .withOrders(Order.ASCENDING, Order.DESCENDING) - .mapPartition(minMaxSelector) - .collect(); - - Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); - - Tuple2<Long, Long> previousMax = null; - for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { - assertTrue("Min element in each partition should be smaller than max.", - tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0); - if (previousMax == null) { - previousMax = tuple2.f1; - } else { - assertTrue("Partitions overlap. Previous max should be smaller than current min.", - tuple2Comparator.compare(previousMax, tuple2.f0) < 0); - if (previousMax.f0.equals(tuple2.f0.f0)) { - //check that ordering on the second key is correct - assertEquals("Ordering on the second field should be continous.", - previousMax.f1 - 1, tuple2.f0.f1.longValue()); - } - previousMax = tuple2.f1; - } - } - } - - @Test - public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000) - .map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() { - @Override - public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception { - return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value); - } - }); - - final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true), - new LongComparator(true)); - MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator); - - final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0) - .withOrders(Order.ASCENDING) - .mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() { - @Override - public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values, - Collector<Tuple2<Long, Long>> out) throws Exception { - for (Tuple2<Tuple2<Long, Long>, Long> value : values) { - out.collect(value.f0); - } - } - }) - .mapPartition(minMaxSelector) - .collect(); - - Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator)); - - Tuple2<Long, Long> previousMax = null; - for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) { - assertTrue("Min element in each partition should be smaller than max.", - tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0); - if (previousMax == null) { - previousMax = tuple2.f1; - } else { - assertTrue("Partitions overlap. Previous max should be smaller than current min.", - tuple2Comparator.compare(previousMax, tuple2.f0) < 0); - if (previousMax.f0.equals(tuple2.f0.f0)) { - assertEquals("Ordering on the second field should be continous.", - previousMax.f1 + 1, tuple2.f0.f1.longValue()); - } - previousMax = tuple2.f1; - } - } - } - - @Test - public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000) - .map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() { - @Override - public Tuple2<ComparablePojo, Long> map(Long value) throws Exception { - return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value); - } - }); - - final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet - .partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() { - @Override - public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception { - return value.f0; - } - }) - .withOrders(Order.ASCENDING) - .mapPartition(new MinMaxSelector<>(new ComparablePojoComparator())) - .mapPartition(new ExtractComparablePojo()) - .collect(); - - final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator = - new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() { - @Override - public int compare(Tuple2<ComparablePojo, ComparablePojo> o1, - Tuple2<ComparablePojo, ComparablePojo> o2) { - return o1.f0.compareTo(o2.f1); - } - }; - Collections.sort(collected, pojoComparator); - - ComparablePojo previousMax = null; - for (Tuple2<ComparablePojo, ComparablePojo> element : collected) { - assertTrue("Min element in each partition should be smaller than max.", - element.f0.compareTo(element.f1) <= 0); - if (previousMax == null) { - previousMax = element.f1; - } else { - assertTrue("Partitions overlap. Previous max should be smaller than current min.", - previousMax.compareTo(element.f0) < 0); - if (previousMax.first.equals(element.f0.first)) { - assertEquals("Ordering on the second field should be continous.", - previousMax.second - 1, element.f0.second.longValue()); - } - previousMax = element.f1; - } - } - } - - private static class ExtractComparablePojo implements MapPartitionFunction< - Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>, - Tuple2<ComparablePojo, ComparablePojo>> { - - @Override - public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values, - Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception { - for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) { - out.collect(new Tuple2<>(value.f0.f0, value.f1.f0)); - } - } - } - - private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable { - - @Override - public int compare(Tuple2<ComparablePojo, Long> o1, - Tuple2<ComparablePojo, Long> o2) { - return o1.f0.compareTo(o2.f0); - } - } - - private static class ComparablePojo implements Comparable<ComparablePojo> { - private Long first; - private Long second; - - public Long getFirst() { - return first; - } - - public void setFirst(Long first) { - this.first = first; - } - - public Long getSecond() { - return second; - } - - public void setSecond(Long second) { - this.second = second; - } - - public ComparablePojo(Long first, - Long second) { - this.first = first; - this.second = second; - } - - public ComparablePojo() { - } - - @Override - public int compareTo(ComparablePojo o) { - final int firstResult = Long.compare(this.first, o.first); - if (firstResult == 0) { - return (-1) * Long.compare(this.second, o.second); - } - - return firstResult; - } - } - - private static class ObjectSelfKeySelector implements KeySelector<Long, Long> { - @Override - public Long getKey(Long value) throws Exception { - return value; - } - } - - private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> { - - private final Comparator<T> comparator; - - public MinMaxSelector(Comparator<T> comparator) { - this.comparator = comparator; - } - - @Override - public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception { - Iterator<T> itr = values.iterator(); - T min = itr.next(); - T max = min; - T value; - while (itr.hasNext()) { - value= itr.next(); - if (comparator.compare(value, min) < 0) { - min = value; - } - if (comparator.compare(value, max) > 0) { - max = value; - } - - } - - Tuple2<T, T> result = new Tuple2<>(min, max); - out.collect(result); - } - } - - private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable { - - private final Comparator<T> firstComparator; - private final Comparator<T> secondComparator; - - public Tuple2Comparator(Comparator<T> comparator) { - this(comparator, comparator); - } - - public Tuple2Comparator(Comparator<T> firstComparator, - Comparator<T> secondComparator) { - this.firstComparator = firstComparator; - this.secondComparator = secondComparator; - } - - @Override - public int compare(Tuple2<T, T> first, Tuple2<T, T> second) { - long result = firstComparator.compare(first.f0, second.f0); - if (result > 0) { - return 1; - } else if (result < 0) { - return -1; - } - - result = secondComparator.compare(first.f1, second.f1); - if (result > 0) { - return 1; - } else if (result < 0) { - return -1; - } - - return 0; - } - } - - private static class LongComparator implements Comparator<Long>, Serializable { - - private final boolean ascending; - - public LongComparator(boolean ascending) { - this.ascending = ascending; - } - - @Override - public int compare(Long o1, Long o2) { - if (ascending) { - return Long.compare(o1, o2); - } else { - return (-1) * Long.compare(o1, o2); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java deleted file mode 100644 index 1054c62..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators; - -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -public class ProjectITCase extends JavaProgramTestBase { - - @Override - protected void testProgram() throws Exception { - /* - * Projection with tuple fields indexes - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple3<String, Long, Integer>> projDs = ds. - project(3,4,2); - List<Tuple3<String, Long, Integer>> result = projDs.collect(); - - String expectedResult = "Hallo,1,0\n" + - "Hallo Welt,2,1\n" + - "Hallo Welt wie,1,2\n" + - "Hallo Welt wie gehts?,2,3\n" + - "ABC,2,4\n" + - "BCD,3,5\n" + - "CDE,2,6\n" + - "DEF,1,7\n" + - "EFG,1,8\n" + - "FGH,2,9\n" + - "GHI,1,10\n" + - "HIJ,3,11\n" + - "IJK,3,12\n" + - "JKL,2,13\n" + - "KLM,2,14\n"; - - compareResultAsTuples(result, expectedResult); - } - -}