http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java new file mode 100644 index 0000000..2d6897b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.operators.util.CollectionDataSets.PojoWithDateAndEnum; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.Date; +import java.util.List; + +/** + * Integration tests for {@link ReduceFunction} and {@link RichReduceFunction}. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class ReduceITCase extends MultipleProgramsTestBase { + + public ReduceITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testReduceOnTuplesWithKeyFieldSelector() throws Exception { + /* + * Reduce on tuples with key field selector + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).reduce(new Tuple3Reduce("B-)")); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + String expected = "1,1,Hi\n" + + "5,2,B-)\n" + + "15,3,B-)\n" + + "34,4,B-)\n" + + "65,5,B-)\n" + + "111,6,B-)\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws Exception{ + /* + * Reduce on tuples with multiple key field selectors + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. + groupBy(4, 0).reduce(new Tuple5Reduce()); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); + + String expected = "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testReduceOnTuplesWithKeyExtractor() throws Exception { + /* + * Reduce on tuples with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)")); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + String expected = "1,1,Hi\n" + + "5,2,B-)\n" + + "15,3,B-)\n" + + "34,4,B-)\n" + + "65,5,B-)\n" + + "111,6,B-)\n"; + + compareResultAsTuples(result, expected); + } + + private static class KeySelector1 implements KeySelector<Tuple3<Integer, Long, String>, Long> { + private static final long serialVersionUID = 1L; + @Override + public Long getKey(Tuple3<Integer, Long, String> in) { + return in.f1; + } + } + + @Test + public void testReduceOnCustomTypeWithKeyExtractor() throws Exception { + /* + * Reduce on custom type with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> reduceDs = ds. + groupBy(new KeySelector2()).reduce(new CustomTypeReduce()); + + List<CustomType> result = reduceDs.collect(); + + String expected = "1,0,Hi\n" + + "2,3,Hello!\n" + + "3,12,Hello!\n" + + "4,30,Hello!\n" + + "5,60,Hello!\n" + + "6,105,Hello!\n"; + + compareResultAsText(result, expected); + } + + private static class KeySelector2 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testAllReduceForTuple() throws Exception { + /* + * All-reduce for tuple + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + reduce(new AllAddingTuple3Reduce()); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + String expected = "231,91,Hello World\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testAllReduceForCustomTypes() throws Exception { + /* + * All-reduce for custom types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> reduceDs = ds. + reduce(new AllAddingCustomTypeReduce()); + + List<CustomType> result = reduceDs.collect(); + + String expected = "91,210,Hello!"; + + compareResultAsText(result, expected); + } + + @Test + public void testReduceWithBroadcastSet() throws Exception { + /* + * Reduce with broadcast set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints"); + + List<Tuple3<Integer, Long, String>> result = reduceDs.collect(); + + String expected = "1,1,Hi\n" + + "5,2,55\n" + + "15,3,55\n" + + "34,4,55\n" + + "65,5,55\n" + + "111,6,55\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testReduceATupleReturningKeySelector() throws Exception { + /* + * Reduce with a Tuple-returning KeySelector + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds + .groupBy(new KeySelector3()).reduce(new Tuple5Reduce()); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); + + String expected = "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); + } + + private static class KeySelector3 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + } + + @Test + public void testReduceOnTupleWithMultipleKeyExpressions() throws Exception { + /* + * Case 2 with String-based field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds + .groupBy("f4", "f0").reduce(new Tuple5Reduce()); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); + + String expected = "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testReduceOnTupleWithMultipleKeyExpressionsWithHashHint() throws Exception { + /* + * Case 2 with String-based field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds + .groupBy("f4", "f0").reduce(new Tuple5Reduce()).setCombineHint(CombineHint.HASH); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs + .collect(); + + String expected = "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testSupportForDataAndEnumSerialization() throws Exception { + /** + * Test support for Date and enum serialization + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0, 2).map(new Mapper1()); + ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env)); + + DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1()); + + List<String> result = res.collect(); + + String expected = "ok\nok"; + + compareResultAsText(result, expected); + } + + private static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> { + @Override + public PojoWithDateAndEnum map(Long value) throws Exception { + int l = value.intValue(); + switch (l) { + case 0: + PojoWithDateAndEnum one = new PojoWithDateAndEnum(); + one.group = "a"; + one.date = new Date(666); + one.cat = CollectionDataSets.Category.CAT_A; + return one; + case 1: + PojoWithDateAndEnum two = new PojoWithDateAndEnum(); + two.group = "a"; + two.date = new Date(666); + two.cat = CollectionDataSets.Category.CAT_A; + return two; + case 2: + PojoWithDateAndEnum three = new PojoWithDateAndEnum(); + three.group = "b"; + three.date = new Date(666); + three.cat = CollectionDataSets.Category.CAT_B; + return three; + } + throw new RuntimeException("Unexpected value for l=" + l); + } + } + + private static class GroupReducer1 implements GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<PojoWithDateAndEnum> values, + Collector<String> out) throws Exception { + for (PojoWithDateAndEnum val : values) { + if (val.cat == CollectionDataSets.Category.CAT_A) { + Assert.assertEquals("a", val.group); + } else if (val.cat == CollectionDataSets.Category.CAT_B) { + Assert.assertEquals("b", val.group); + } else { + Assert.fail("error. Cat = " + val.cat); + } + Assert.assertEquals(666, val.date.getTime()); + } + out.collect("ok"); + } + } + + private static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); + private final String f2Replace; + + public Tuple3Reduce() { + this.f2Replace = null; + } + + public Tuple3Reduce(String f2Replace) { + this.f2Replace = f2Replace; + } + + @Override + public Tuple3<Integer, Long, String> reduce( + Tuple3<Integer, Long, String> in1, + Tuple3<Integer, Long, String> in2) throws Exception { + + if (f2Replace == null) { + out.setFields(in1.f0 + in2.f0, in1.f1, in1.f2); + } else { + out.setFields(in1.f0 + in2.f0, in1.f1, this.f2Replace); + } + return out; + } + } + + private static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> { + private static final long serialVersionUID = 1L; + private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); + + @Override + public Tuple5<Integer, Long, Integer, String, Long> reduce( + Tuple5<Integer, Long, Integer, String, Long> in1, + Tuple5<Integer, Long, Integer, String, Long> in2) + throws Exception { + + out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); + return out; + } + } + + private static class CustomTypeReduce implements ReduceFunction<CustomType> { + private static final long serialVersionUID = 1L; + private final CustomType out = new CustomType(); + + @Override + public CustomType reduce(CustomType in1, CustomType in2) + throws Exception { + + out.myInt = in1.myInt; + out.myLong = in1.myLong + in2.myLong; + out.myString = "Hello!"; + return out; + } + } + + private static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); + + @Override + public Tuple3<Integer, Long, String> reduce( + Tuple3<Integer, Long, String> in1, + Tuple3<Integer, Long, String> in2) throws Exception { + + out.setFields(in1.f0 + in2.f0, in1.f1 + in2.f1, "Hello World"); + return out; + } + } + + private static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> { + private static final long serialVersionUID = 1L; + private final CustomType out = new CustomType(); + + @Override + public CustomType reduce(CustomType in1, CustomType in2) + throws Exception { + + out.myInt = in1.myInt + in2.myInt; + out.myLong = in1.myLong + in2.myLong; + out.myString = "Hello!"; + return out; + } + } + + private static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); + private String f2Replace = ""; + + @Override + public void open(Configuration config) { + + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + int sum = 0; + for (Integer i : ints) { + sum += i; + } + f2Replace = sum + ""; + + } + + @Override + public Tuple3<Integer, Long, String> reduce( + Tuple3<Integer, Long, String> in1, + Tuple3<Integer, Long, String> in2) throws Exception { + + out.setFields(in1.f0 + in2.f0, in1.f1, this.f2Replace); + return out; + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java new file mode 100644 index 0000000..c6d340a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceWithCombinerITCase.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.CombineFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +/** + * Integration tests for {@link GroupCombineFunction}. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class ReduceWithCombinerITCase extends MultipleProgramsTestBase { + + public ReduceWithCombinerITCase(TestExecutionMode mode) { + super(TestExecutionMode.CLUSTER); + } + + @Test + public void testReduceOnNonKeyedDataset() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple2<Integer, Boolean>> input = createNonKeyedInput(env); + List<Tuple2<Integer, Boolean>> actual = input.reduceGroup(new NonKeyedCombReducer()).collect(); + String expected = "10,true\n"; + + compareResultAsTuples(actual, expected); + } + + @Test + public void testForkingReduceOnNonKeyedDataset() throws Exception { + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple2<Integer, Boolean>> input = createNonKeyedInput(env); + + DataSet<Tuple2<Integer, Boolean>> r1 = input.reduceGroup(new NonKeyedCombReducer()); + DataSet<Tuple2<Integer, Boolean>> r2 = input.reduceGroup(new NonKeyedGroupCombReducer()); + + List<Tuple2<Integer, Boolean>> actual = r1.union(r2).collect(); + String expected = "10,true\n10,true\n"; + compareResultAsTuples(actual, expected); + } + + @Test + public void testReduceOnKeyedDataset() throws Exception { + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env); + List<Tuple3<String, Integer, Boolean>> actual = input.groupBy(0).reduceGroup(new KeyedCombReducer()).collect(); + String expected = "k1,6,true\nk2,4,true\n"; + + compareResultAsTuples(actual, expected); + } + + @Test + public void testReduceOnKeyedDatasetWithSelector() throws Exception { + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env); + + List<Tuple3<String, Integer, Boolean>> actual = input + .groupBy(new KeySelectorX()) + .reduceGroup(new KeyedCombReducer()) + .collect(); + String expected = "k1,6,true\nk2,4,true\n"; + + compareResultAsTuples(actual, expected); + } + + @Test + public void testForkingReduceOnKeyedDataset() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env); + + UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = input.groupBy(0); + + DataSet<Tuple3<String, Integer, Boolean>> r1 = counts.reduceGroup(new KeyedCombReducer()); + DataSet<Tuple3<String, Integer, Boolean>> r2 = counts.reduceGroup(new KeyedGroupCombReducer()); + + List<Tuple3<String, Integer, Boolean>> actual = r1.union(r2).collect(); + String expected = "k1,6,true\n" + + "k2,4,true\n" + + "k1,6,true\n" + + "k2,4,true\n"; + compareResultAsTuples(actual, expected); + } + + @Test + public void testForkingReduceOnKeyedDatasetWithSelection() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // creates the input data and distributes them evenly among the available downstream tasks + DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env); + + UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = input.groupBy(new KeySelectorX()); + + DataSet<Tuple3<String, Integer, Boolean>> r1 = counts.reduceGroup(new KeyedCombReducer()); + DataSet<Tuple3<String, Integer, Boolean>> r2 = counts.reduceGroup(new KeyedGroupCombReducer()); + + List<Tuple3<String, Integer, Boolean>> actual = r1.union(r2).collect(); + String expected = "k1,6,true\n" + + "k2,4,true\n" + + "k1,6,true\n" + + "k2,4,true\n"; + + compareResultAsTuples(actual, expected); + } + + private DataSet<Tuple2<Integer, Boolean>> createNonKeyedInput(ExecutionEnvironment env) { + return env.fromCollection(Arrays.asList( + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false), + new Tuple2<>(1, false)) + ).rebalance(); + } + + private static class NonKeyedCombReducer implements CombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>, + GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>> { + + @Override + public Tuple2<Integer, Boolean> combine(Iterable<Tuple2<Integer, Boolean>> values) throws Exception { + int sum = 0; + boolean flag = true; + + for (Tuple2<Integer, Boolean> tuple : values) { + sum += tuple.f0; + flag &= !tuple.f1; + + } + return new Tuple2<>(sum, flag); + } + + @Override + public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception { + int sum = 0; + boolean flag = true; + for (Tuple2<Integer, Boolean> tuple : values) { + sum += tuple.f0; + flag &= tuple.f1; + } + out.collect(new Tuple2<>(sum, flag)); + } + } + + private static class NonKeyedGroupCombReducer implements GroupCombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>, + GroupReduceFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>> { + + @Override + public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception { + int sum = 0; + boolean flag = true; + for (Tuple2<Integer, Boolean> tuple : values) { + sum += tuple.f0; + flag &= tuple.f1; + } + out.collect(new Tuple2<>(sum, flag)); + } + + @Override + public void combine(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception { + int sum = 0; + boolean flag = true; + for (Tuple2<Integer, Boolean> tuple : values) { + sum += tuple.f0; + flag &= !tuple.f1; + } + out.collect(new Tuple2<>(sum, flag)); + } + } + + private DataSet<Tuple3<String, Integer, Boolean>> createKeyedInput(ExecutionEnvironment env) { + return env.fromCollection(Arrays.asList( + new Tuple3<>("k1", 1, false), + new Tuple3<>("k1", 1, false), + new Tuple3<>("k1", 1, false), + new Tuple3<>("k2", 1, false), + new Tuple3<>("k1", 1, false), + new Tuple3<>("k1", 1, false), + new Tuple3<>("k2", 1, false), + new Tuple3<>("k2", 1, false), + new Tuple3<>("k1", 1, false), + new Tuple3<>("k2", 1, false)) + ).rebalance(); + } + + private static class KeySelectorX implements KeySelector<Tuple3<String, Integer, Boolean>, String> { + private static final long serialVersionUID = 1L; + @Override + public String getKey(Tuple3<String, Integer, Boolean> in) { + return in.f0; + } + } + + private class KeyedCombReducer implements CombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>, + GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> { + + @Override + public Tuple3<String, Integer, Boolean> combine(Iterable<Tuple3<String, Integer, Boolean>> values) throws Exception { + String key = null; + int sum = 0; + boolean flag = true; + + for (Tuple3<String, Integer, Boolean> tuple : values) { + key = (key == null) ? tuple.f0 : key; + sum += tuple.f1; + flag &= !tuple.f2; + } + return new Tuple3<>(key, sum, flag); + } + + @Override + public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception { + String key = null; + int sum = 0; + boolean flag = true; + + for (Tuple3<String, Integer, Boolean> tuple : values) { + key = (key == null) ? tuple.f0 : key; + sum += tuple.f1; + flag &= tuple.f2; + } + out.collect(new Tuple3<>(key, sum, flag)); + } + } + + private class KeyedGroupCombReducer implements GroupCombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>, + GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> { + + @Override + public void combine(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception { + String key = null; + int sum = 0; + boolean flag = true; + + for (Tuple3<String, Integer, Boolean> tuple : values) { + key = (key == null) ? tuple.f0 : key; + sum += tuple.f1; + flag &= !tuple.f2; + } + out.collect(new Tuple3<>(key, sum, flag)); + } + + @Override + public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception { + String key = null; + int sum = 0; + boolean flag = true; + + for (Tuple3<String, Integer, Boolean> tuple : values) { + key = (key == null) ? tuple.f0 : key; + sum += tuple.f1; + flag &= tuple.f2; + } + out.collect(new Tuple3<>(key, sum, flag)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java new file mode 100644 index 0000000..36eded6 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.runtime.minicluster.StandaloneMiniCluster; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Integration tests for {@link org.apache.flink.api.java.RemoteEnvironment}. + */ +@SuppressWarnings("serial") +public class RemoteEnvironmentITCase extends TestLogger { + + private static final int TM_SLOTS = 4; + + private static final int USER_DOP = 2; + + private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms"; + + private static final String VALID_STARTUP_TIMEOUT = "100 s"; + + private static Configuration configuration; + + private static StandaloneMiniCluster cluster; + + @BeforeClass + public static void setupCluster() throws Exception { + configuration = new Configuration(); + + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); + + cluster = new StandaloneMiniCluster(configuration); + } + + @AfterClass + public static void tearDownCluster() throws Exception { + cluster.close(); + } + + /** + * Ensure that that Akka configuration parameters can be set. + */ + @Test(expected = FlinkException.class) + public void testInvalidAkkaConfiguration() throws Throwable { + Configuration config = new Configuration(); + config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + cluster.getHostname(), + cluster.getPort(), + config + ); + env.getConfig().disableSysoutLogging(); + + DataSet<String> result = env.createInput(new TestNonRichInputFormat()); + result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>())); + try { + env.execute(); + Assert.fail("Program should not run successfully, cause of invalid akka settings."); + } catch (ProgramInvocationException ex) { + throw ex.getCause(); + } + } + + /** + * Ensure that the program parallelism can be set even if the configuration is supplied. + */ + @Test + public void testUserSpecificParallelism() throws Exception { + Configuration config = new Configuration(); + config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + cluster.getHostname(), + cluster.getPort(), + config + ); + env.setParallelism(USER_DOP); + env.getConfig().disableSysoutLogging(); + + DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat()) + .rebalance() + .mapPartition(new RichMapPartitionFunction<Integer, Integer>() { + @Override + public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + }); + List<Integer> resultCollection = result.collect(); + assertEquals(USER_DOP, resultCollection.size()); + } + + private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(USER_DOP, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java new file mode 100644 index 0000000..c023cf4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReplicatingDataSourceITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.io.ReplicatingInputFormat; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.ParallelIteratorInputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.NumberSequenceIterator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Tests for replicating DataSources. + */ +@RunWith(Parameterized.class) +public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase { + + public ReplicatingDataSourceITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testReplicatedSourceToJoin() throws Exception { + /* + * Test replicated source going into join + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit> + (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO) + .map(new ToTuple()); + DataSet<Tuple1<Long>> source2 = env.generateSequence(0L, 1000L).map(new ToTuple()); + + DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0) + .projectFirst(0) + .sum(0); + + List<Tuple> result = pairs.collect(); + + String expectedResult = "(500500)"; + + compareResultAsText(result, expectedResult); + } + + @Test + public void testReplicatedSourceToCross() throws Exception { + /* + * Test replicated source going into cross + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit> + (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO) + .map(new ToTuple()); + DataSet<Tuple1<Long>> source2 = env.generateSequence(0L, 1000L).map(new ToTuple()); + + DataSet<Tuple1<Long>> pairs = source1.cross(source2) + .filter(new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>() { + @Override + public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception { + return value.f0.f0.equals(value.f1.f0); + } + }) + .map(new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>() { + @Override + public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception { + return value.f0; + } + }) + .sum(0); + + List<Tuple1<Long>> result = pairs.collect(); + + String expectedResult = "(500500)"; + + compareResultAsText(result, expectedResult); + } + + private static class ToTuple implements MapFunction<Long, Tuple1<Long>> { + + @Override + public Tuple1<Long> map(Long value) throws Exception { + return new Tuple1<Long>(value); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java new file mode 100644 index 0000000..c0cc62a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/SampleITCase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.FlatMapOperator; +import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Integration tests for {@link DataSetUtils#sample}. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class SampleITCase extends MultipleProgramsTestBase { + + private static final Random RNG = new Random(); + + public SampleITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void initiate() { + ExecutionEnvironment.getExecutionEnvironment().setParallelism(5); + } + + @Test + public void testSamplerWithFractionWithoutReplacement() throws Exception { + verifySamplerWithFractionWithoutReplacement(0d); + verifySamplerWithFractionWithoutReplacement(0.2d); + verifySamplerWithFractionWithoutReplacement(1.0d); + } + + @Test + public void testSamplerWithFractionWithReplacement() throws Exception { + verifySamplerWithFractionWithReplacement(0d); + verifySamplerWithFractionWithReplacement(0.2d); + verifySamplerWithFractionWithReplacement(1.0d); + verifySamplerWithFractionWithReplacement(2.0d); + } + + @Test + public void testSamplerWithSizeWithoutReplacement() throws Exception { + verifySamplerWithFixedSizeWithoutReplacement(0); + verifySamplerWithFixedSizeWithoutReplacement(2); + verifySamplerWithFixedSizeWithoutReplacement(21); + } + + @Test + public void testSamplerWithSizeWithReplacement() throws Exception { + verifySamplerWithFixedSizeWithReplacement(0); + verifySamplerWithFixedSizeWithReplacement(2); + verifySamplerWithFixedSizeWithReplacement(21); + } + + private void verifySamplerWithFractionWithoutReplacement(double fraction) throws Exception { + verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong()); + } + + private void verifySamplerWithFractionWithoutReplacement(double fraction, long seed) throws Exception { + verifySamplerWithFraction(false, fraction, seed); + } + + private void verifySamplerWithFractionWithReplacement(double fraction) throws Exception { + verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong()); + } + + private void verifySamplerWithFractionWithReplacement(double fraction, long seed) throws Exception { + verifySamplerWithFraction(true, fraction, seed); + } + + private void verifySamplerWithFraction(boolean withReplacement, double fraction, long seed) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env); + MapPartitionOperator<String, String> sampled = DataSetUtils.sample(ds, withReplacement, fraction, seed); + List<String> result = sampled.collect(); + containsResultAsText(result, getSourceStrings()); + } + + private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples) throws Exception { + verifySamplerWithFixedSizeWithoutReplacement(numSamples, RNG.nextLong()); + } + + private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples, long seed) throws Exception { + verifySamplerWithFixedSize(false, numSamples, seed); + } + + private void verifySamplerWithFixedSizeWithReplacement(int numSamples) throws Exception { + verifySamplerWithFixedSizeWithReplacement(numSamples, RNG.nextLong()); + } + + private void verifySamplerWithFixedSizeWithReplacement(int numSamples, long seed) throws Exception { + verifySamplerWithFixedSize(true, numSamples, seed); + } + + private void verifySamplerWithFixedSize(boolean withReplacement, int numSamples, long seed) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env); + DataSet<String> sampled = DataSetUtils.sampleWithSize(ds, withReplacement, numSamples, seed); + List<String> result = sampled.collect(); + assertEquals(numSamples, result.size()); + containsResultAsText(result, getSourceStrings()); + } + + private FlatMapOperator<Tuple3<Integer, Long, String>, String> getSourceDataSet(ExecutionEnvironment env) { + return CollectionDataSets.get3TupleDataSet(env).flatMap( + new FlatMapFunction<Tuple3<Integer, Long, String>, String>() { + @Override + public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception { + out.collect(value.f2); + } + }); + } + + private String getSourceStrings() { + return "Hi\n" + + "Hello\n" + + "Hello world\n" + + "Hello world, how are you?\n" + + "I am fine.\n" + + "Luke Skywalker\n" + + "Comment#1\n" + + "Comment#2\n" + + "Comment#3\n" + + "Comment#4\n" + + "Comment#5\n" + + "Comment#6\n" + + "Comment#7\n" + + "Comment#8\n" + + "Comment#9\n" + + "Comment#10\n" + + "Comment#11\n" + + "Comment#12\n" + + "Comment#13\n" + + "Comment#14\n" + + "Comment#15\n"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java new file mode 100644 index 0000000..a44f28c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/SortPartitionITCase.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.POJO; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * Tests for {@link DataSet#sortPartition}. + */ +@RunWith(Parameterized.class) +public class SortPartitionITCase extends MultipleProgramsTestBase { + + public SortPartitionITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testSortPartitionByKeyField() throws Exception { + /* + * Test sort partition on key field + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input + .sortPartition(1, Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple3Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionByTwoKeyFields() throws Exception { + /* + * Test sort partition on two key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input + .sortPartition(4, Order.ASCENDING) + .sortPartition(2, Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple5Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSortPartitionByFieldExpression() throws Exception { + /* + * Test sort partition on field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper()).setParallelism(4) // parallelize input + .sortPartition("f1", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple3Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionByTwoFieldExpressions() throws Exception { + /* + * Test sort partition on two field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input + .sortPartition("f4", Order.ASCENDING) + .sortPartition("f2", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple5Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionByNestedFieldExpression() throws Exception { + /* + * Test sort partition on nested field expressions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple2<Tuple2<Integer, Integer>, String>>()).setParallelism(3) // parallelize input + .sortPartition("f0.f1", Order.ASCENDING) + .sortPartition("f1", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new NestedTupleChecker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionPojoByNestedFieldExpression() throws Exception { + /* + * Test sort partition on field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<POJO>()).setParallelism(1) // parallelize input + .sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING) + .sortPartition("number", Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new PojoChecker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionParallelismChange() throws Exception { + /* + * Test sort partition with parallelism change + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism + .mapPartition(new OrderCheckMapper<>(new Tuple3Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionWithKeySelector1() throws Exception { + /* + * Test sort partition on an extracted key + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input + .sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Long>() { + @Override + public Long getKey(Tuple3<Integer, Long, String> value) throws Exception { + return value.f1; + } + }, Order.ASCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple3AscendingChecker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testSortPartitionWithKeySelector2() throws Exception { + /* + * Test sort partition on an extracted key + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + List<Tuple1<Boolean>> result = ds + .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input + .sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() { + @Override + public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> value) throws Exception { + return new Tuple2<>(value.f0, value.f1); + } + }, Order.DESCENDING) + .mapPartition(new OrderCheckMapper<>(new Tuple3Checker())) + .distinct().collect(); + + String expected = "(true)\n"; + + compareResultAsText(result, expected); + } + + private interface OrderChecker<T> extends Serializable { + boolean inOrder(T t1, T t2); + } + + @SuppressWarnings("serial") + private static class Tuple3Checker implements OrderChecker<Tuple3<Integer, Long, String>> { + @Override + public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) { + return t1.f1 >= t2.f1; + } + } + + @SuppressWarnings("serial") + private static class Tuple3AscendingChecker implements OrderChecker<Tuple3<Integer, Long, String>> { + @Override + public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) { + return t1.f1 <= t2.f1; + } + } + + @SuppressWarnings("serial") + private static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> { + @Override + public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1, + Tuple5<Integer, Long, Integer, String, Long> t2) { + return t1.f4 < t2.f4 || t1.f4.equals(t2.f4) && t1.f2 >= t2.f2; + } + } + + @SuppressWarnings("serial") + private static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> { + @Override + public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1, + Tuple2<Tuple2<Integer, Integer>, String> t2) { + return t1.f0.f1 < t2.f0.f1 || + t1.f0.f1.equals(t2.f0.f1) && t1.f1.compareTo(t2.f1) >= 0; + } + } + + @SuppressWarnings("serial") + private static class PojoChecker implements OrderChecker<POJO> { + @Override + public boolean inOrder(POJO t1, POJO t2) { + return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 || + t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 && + t1.number >= t2.number; + } + } + + @SuppressWarnings("unused, serial") + private static class OrderCheckMapper<T> implements MapPartitionFunction<T, Tuple1<Boolean>> { + + OrderChecker<T> checker; + + public OrderCheckMapper() {} + + public OrderCheckMapper(OrderChecker<T> checker) { + this.checker = checker; + } + + @Override + public void mapPartition(Iterable<T> values, Collector<Tuple1<Boolean>> out) throws Exception { + + Iterator<T> it = values.iterator(); + if (!it.hasNext()) { + out.collect(new Tuple1<>(true)); + } else { + T last = it.next(); + + while (it.hasNext()) { + T next = it.next(); + if (!checker.inOrder(last, next)) { + out.collect(new Tuple1<>(false)); + return; + } + last = next; + } + out.collect(new Tuple1<>(true)); + } + } + } + + @SuppressWarnings("serial") + private static class IdMapper<T> implements MapFunction<T, T> { + + @Override + public T map(T value) throws Exception { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java new file mode 100644 index 0000000..ebec17b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/SumMinMaxITCase.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Integration tests for {@link org.apache.flink.api.scala.GroupedDataSet#min} and + * {@link org.apache.flink.api.scala.GroupedDataSet#max}. + */ +@RunWith(Parameterized.class) +public class SumMinMaxITCase extends MultipleProgramsTestBase { + + public SumMinMaxITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testSumMaxAndProject() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> sumDs = ds + .sum(0) + .andMax(1) + .project(0, 1); + + List<Tuple2<Integer, Long>> result = sumDs.collect(); + + String expected = "231,6\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testGroupedAggregate() throws Exception { + /* + * Grouped Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1) + .sum(0) + .project(1, 0); + + List<Tuple2<Long, Integer>> result = aggregateDs.collect(); + + String expected = "1,1\n" + + "2,5\n" + + "3,15\n" + + "4,34\n" + + "5,65\n" + + "6,111\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testNestedAggregate() throws Exception { + /* + * Nested Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1) + .min(0) + .min(0) + .project(0); + + List<Tuple1<Integer>> result = aggregateDs.collect(); + + String expected = "1\n"; + + compareResultAsTuples(result, expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java new file mode 100644 index 0000000..75bf8f0 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +/** + * Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}. + */ +@RunWith(Parameterized.class) +public class TypeHintITCase extends JavaProgramTestBase { + + private static final int NUM_PROGRAMS = 9; + + private int curProgId = config.getInteger("ProgramId", -1); + + public TypeHintITCase(Configuration config) { + super(config); + } + + @Override + protected void testProgram() throws Exception { + TypeHintProgs.runProgram(curProgId); + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for (int i = 1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class TypeHintProgs { + + public static void runProgram(int progId) throws Exception { + switch(progId) { + // Test identity map with missing types and string type hint + case 1: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds + .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) + .returns("Tuple3<Integer, Long, String>"); + List<Tuple3<Integer, Long, String>> result = identityMapDs.collect(); + + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test identity map with missing types and type information type hint + case 2: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds + // all following generics get erased during compilation + .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>()) + .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + List<Tuple3<Integer, Long, String>> result = identityMapDs + .collect(); + + String expectedResult = "(2,2,Hello)\n" + + "(3,2,Hello world)\n" + + "(1,1,Hi)\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test flat map with class type hint + case 3: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> identityMapDs = ds + .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>()) + .returns(Integer.class); + List<Integer> result = identityMapDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test join with type information type hint + case 4: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds1 + .join(ds2) + .where(0) + .equalTo(0) + .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test flat join with type information type hint + case 5: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds1 + .join(ds2) + .where(0) + .equalTo(0) + .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test unsorted group reduce with type information type hint + case 6: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds + .groupBy(0) + .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test sorted group reduce with type information type hint + case 7: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds + .groupBy(0) + .sortGroup(0, Order.ASCENDING) + .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test combine group with type information type hint + case 8: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds + .groupBy(0) + .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + // Test cogroup with type information type hint + case 9: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> resultDs = ds1 + .coGroup(ds2) + .where(0) + .equalTo(0) + .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>()) + .returns(BasicTypeInfo.INT_TYPE_INFO); + List<Integer> result = resultDs.collect(); + + String expectedResult = "2\n" + + "3\n" + + "1\n"; + + compareResultAsText(result, expectedResult); + break; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } + + // -------------------------------------------------------------------------------------------- + + private static class Mapper<T, V> implements MapFunction<T, V> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + @Override + public V map(T value) throws Exception { + return (V) value; + } + } + + private static class FlatMapper<T, V> implements FlatMapFunction<T, V> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void flatMap(T value, Collector<V> out) throws Exception { + out.collect((V) ((Tuple3) value).f0); + } + } + + private static class Joiner<IN1, IN2, OUT> implements JoinFunction<IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + @Override + public OUT join(IN1 first, IN2 second) throws Exception { + return (OUT) ((Tuple3) first).f0; + } + } + + private static class FlatJoiner<IN1, IN2, OUT> implements FlatJoinFunction<IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + @Override + public void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception { + out.collect((OUT) ((Tuple3) first).f0); + } + } + + private static class GroupReducer<IN, OUT> implements GroupReduceFunction<IN, OUT> { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception { + out.collect((OUT) ((Tuple3) values.iterator().next()).f0); + } + } + + private static class GroupCombiner<IN, OUT> implements GroupCombineFunction<IN, OUT> { + private static final long serialVersionUID = 1L; + + @Override + public void combine(Iterable<IN> values, Collector<OUT> out) throws Exception { + out.collect((OUT) ((Tuple3) values.iterator().next()).f0); + } + } + + private static class CoGrouper<IN1, IN2, OUT> implements CoGroupFunction<IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception { + out.collect((OUT) ((Tuple3) first.iterator().next()).f0); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java new file mode 100644 index 0000000..daa9cb1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Integration tests for {@link DataSet#union}. + */ +@RunWith(Parameterized.class) +public class UnionITCase extends MultipleProgramsTestBase { + + private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n" + + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + + "8,4,Comment#2\n" + + "9,4,Comment#3\n" + + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + + "12,5,Comment#6\n" + + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + + "21,6,Comment#15\n"; + + public UnionITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testUnion2IdenticalDataSets() throws Exception { + /* + * Union of 2 Same Data Sets + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env)); + + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); + + String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING; + + compareResultAsTuples(result, expected); + } + + @Test + public void testUnion5IdenticalDataSets() throws Exception { + /* + * Union of 5 same Data Sets, with multiple unions + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env)) + .union(CollectionDataSets.get3TupleDataSet(env)) + .union(CollectionDataSets.get3TupleDataSet(env)) + .union(CollectionDataSets.get3TupleDataSet(env)); + + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); + + String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + + FULL_TUPLE_3_STRING + + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING; + + compareResultAsTuples(result, expected); + } + + @Test + public void testUnionWithEmptyDataSet() throws Exception { + /* + * Test on union with empty dataset + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Don't know how to make an empty result in an other way than filtering it + DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env). + filter(new RichFilter1()); + + DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env) + .union(empty); + + List<Tuple3<Integer, Long, String>> result = unionDs.collect(); + + String expected = FULL_TUPLE_3_STRING; + + compareResultAsTuples(result, expected); + } + + private static class RichFilter1 extends RichFilterFunction<Tuple3<Integer, Long, String>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter(Tuple3<Integer, Long, String> value) throws Exception { + return false; + } + } + +}