http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index a41874c..e8ee0bb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -31,12 +31,13 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.Validate; -import org.apache.flink.api.common.functions.GenericCoGrouper; -import org.apache.flink.api.common.functions.GenericCrosser; -import org.apache.flink.api.common.functions.GenericFlatMap; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.api.common.functions.GenericJoiner; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.java.functions.InvalidTypesException; import org.apache.flink.api.java.functions.KeySelector; @@ -60,64 +61,75 @@ public class TypeExtractor { // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") - public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(GenericMap<IN, OUT> mapInterface, TypeInformation<IN> inType) { - validateInputType(GenericMap.class, mapInterface.getClass(), 0, inType); + public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) { + validateInputType(MapFunction.class, mapInterface.getClass(), 0, inType); if(mapInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericMap.class, mapInterface.getClass(), 1, inType, null); + return new TypeExtractor().privateCreateTypeInfo(MapFunction.class, mapInterface.getClass(), 1, inType, null); } @SuppressWarnings("unchecked") - public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(GenericFlatMap<IN, OUT> flatMapInterface, TypeInformation<IN> inType) { - validateInputType(GenericFlatMap.class, flatMapInterface.getClass(), 0, inType); + public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) { + validateInputType(FlatMapFunction.class, flatMapInterface.getClass(), 0, inType); if(flatMapInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) flatMapInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericFlatMap.class, flatMapInterface.getClass(), 1, inType, null); + return new TypeExtractor().privateCreateTypeInfo(FlatMapFunction.class, flatMapInterface.getClass(), 1, inType, null); } @SuppressWarnings("unchecked") - public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GenericGroupReduce<IN, OUT> groupReduceInterface, + public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType) { - validateInputType(GenericGroupReduce.class, groupReduceInterface.getClass(), 0, inType); + validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType); if(groupReduceInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) groupReduceInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericGroupReduce.class, groupReduceInterface.getClass(), 1, inType, null); + return new TypeExtractor().privateCreateTypeInfo(GroupReduceFunction.class, groupReduceInterface.getClass(), 1, inType, null); } @SuppressWarnings("unchecked") - public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(GenericJoiner<IN1, IN2, OUT> joinInterface, + public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { - validateInputType(GenericJoiner.class, joinInterface.getClass(), 0, in1Type); - validateInputType(GenericJoiner.class, joinInterface.getClass(), 1, in2Type); + validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 0, in1Type); + validateInputType(FlatJoinFunction.class, joinInterface.getClass(), 1, in2Type); if(joinInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericJoiner.class, joinInterface.getClass(), 2, in1Type, in2Type); + return new TypeExtractor().privateCreateTypeInfo(FlatJoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type); + } + + @SuppressWarnings("unchecked") + public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface, + TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { + validateInputType(JoinFunction.class, joinInterface.getClass(), 0, in1Type); + validateInputType(JoinFunction.class, joinInterface.getClass(), 1, in2Type); + if(joinInterface instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable<OUT>) joinInterface).getProducedType(); + } + return new TypeExtractor().privateCreateTypeInfo(JoinFunction.class, joinInterface.getClass(), 2, in1Type, in2Type); } @SuppressWarnings("unchecked") - public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(GenericCoGrouper<IN1, IN2, OUT> coGroupInterface, + public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { - validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 0, in1Type); - validateInputType(GenericCoGrouper.class, coGroupInterface.getClass(), 1, in2Type); + validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 0, in1Type); + validateInputType(CoGroupFunction.class, coGroupInterface.getClass(), 1, in2Type); if(coGroupInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) coGroupInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericCoGrouper.class, coGroupInterface.getClass(), 2, in1Type, in2Type); + return new TypeExtractor().privateCreateTypeInfo(CoGroupFunction.class, coGroupInterface.getClass(), 2, in1Type, in2Type); } @SuppressWarnings("unchecked") - public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(GenericCrosser<IN1, IN2, OUT> crossInterface, + public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { - validateInputType(GenericCrosser.class, crossInterface.getClass(), 0, in1Type); - validateInputType(GenericCrosser.class, crossInterface.getClass(), 1, in2Type); + validateInputType(CrossFunction.class, crossInterface.getClass(), 0, in1Type); + validateInputType(CrossFunction.class, crossInterface.getClass(), 1, in2Type); if(crossInterface instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) crossInterface).getProducedType(); } - return new TypeExtractor().privateCreateTypeInfo(GenericCrosser.class, crossInterface.getClass(), 2, in1Type, in2Type); + return new TypeExtractor().privateCreateTypeInfo(CrossFunction.class, crossInterface.getClass(), 2, in1Type, in2Type); } @SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java index 8e0abcb..c786345 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java @@ -235,7 +235,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im try { for (; i < keyPositions.length; i++) { int keyPos = keyPositions[i]; - int cmp = comparators[i].compare(first.getField(keyPos), second.getField(keyPos)); + int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos)); if (cmp != 0) { return cmp; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java index 1159512..474b022 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java @@ -28,8 +28,6 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; @@ -284,7 +282,7 @@ public class SemanticPropertiesTranslationTest { @ConstantFields("*") - public static class WildcardConstantMapper<T> extends MapFunction<T, T> { + public static class WildcardConstantMapper<T> extends RichMapFunction<T, T> { @Override public T map(T value) { @@ -293,7 +291,7 @@ public class SemanticPropertiesTranslationTest { } @ConstantFields("0->0;1->1;2->2") - public static class IndividualConstantMapper<X, Y, Z> extends MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> { + public static class IndividualConstantMapper<X, Y, Z> extends RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> { @Override public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) { @@ -302,7 +300,7 @@ public class SemanticPropertiesTranslationTest { } @ConstantFields("0") - public static class ZeroConstantMapper<T> extends MapFunction<T, T> { + public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> { @Override public T map(T value) { @@ -312,7 +310,7 @@ public class SemanticPropertiesTranslationTest { @ConstantFieldsFirst("1 -> 0") @ConstantFieldsSecond("1 -> 1") - public static class ForwardingTupleJoin<A, B, C, D> extends JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> { + public static class ForwardingTupleJoin<A, B, C, D> extends RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> { @Override public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> second) { @@ -322,7 +320,7 @@ public class SemanticPropertiesTranslationTest { @ConstantFieldsFirst("0 -> 0") @ConstantFieldsSecond("0 -> 1") - public static class ForwardingBasicJoin<A, B> extends JoinFunction<A, B, Tuple2<A, B>> { + public static class ForwardingBasicJoin<A, B> extends RichJoinFunction<A, B, Tuple2<A, B>> { @Override public Tuple2<A, B> join(A first, B second) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java index db795d9..155bbd1 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java @@ -35,9 +35,9 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.CoGroupFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichCoGroupFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; @@ -128,8 +128,14 @@ public class DeltaIterationTranslationTest implements java.io.Serializable { assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass()); assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass()); - assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass()); - + if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) { + WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject(); + assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass()); + } + else { + assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass()); + } + assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName()); assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName()); @@ -215,21 +221,21 @@ public class DeltaIterationTranslationTest implements java.io.Serializable { // -------------------------------------------------------------------------------------------- - public static class SolutionWorksetJoin extends JoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> { + public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> { @Override public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){ return null; } } - public static class NextWorksetMapper extends MapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> { + public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> { @Override public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) { return null; } } - public static class IdentityMapper<T> extends MapFunction<T, T> { + public static class IdentityMapper<T> extends RichMapFunction<T, T> { @Override public T map(T value) throws Exception { @@ -237,7 +243,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable { } } - public static class SolutionWorksetCoGroup1 extends CoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> { + public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> { @Override public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second, @@ -245,7 +251,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable { } } - public static class SolutionWorksetCoGroup2 extends CoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> { + public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> { @Override public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java index 9f6a6d8..8e457ce 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java @@ -27,9 +27,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.ReduceFunction; -import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; -import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -53,7 +51,7 @@ public class ReduceTranslationTests implements java.io.Serializable { DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env); - initialData.reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() { + initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() { public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) { return value1; } @@ -94,7 +92,7 @@ public class ReduceTranslationTests implements java.io.Serializable { initialData .groupBy(2) - .reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() { + .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() { public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) { return value1; } @@ -141,7 +139,7 @@ public class ReduceTranslationTests implements java.io.Serializable { return value.f1; } }) - .reduce(new ReduceFunction<Tuple3<Double,StringValue,LongValue>>() { + .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() { public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) { return value1; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index b284052..c6ad73d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -23,16 +23,16 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.functions.CoGroupFunction; -import org.apache.flink.api.java.functions.CrossFunction; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; +import org.apache.flink.api.java.functions.RichCoGroupFunction; +import org.apache.flink.api.java.functions.RichCrossFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.InvalidTypesException; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichFlatJoinFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -68,7 +68,7 @@ public class TypeExtractorTest { @Test public void testBasicType() { // use getGroupReduceReturnTypes() - GroupReduceFunction<?, ?> function = new GroupReduceFunction<Boolean, Boolean>() { + RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() { private static final long serialVersionUID = 1L; @Override @@ -107,7 +107,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testWritableType() { - MapFunction<?, ?> function = new MapFunction<MyWritable, MyWritable>() { + RichMapFunction<?, ?> function = new RichMapFunction<MyWritable, MyWritable>() { private static final long serialVersionUID = 1L; @Override @@ -127,7 +127,7 @@ public class TypeExtractorTest { @Test public void testTupleWithBasicTypes() throws Exception { // use getMapReturnTypes() - MapFunction<?, ?> function = new MapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>() { private static final long serialVersionUID = 1L; @Override @@ -192,7 +192,7 @@ public class TypeExtractorTest { @Test public void testTupleWithTuples() { // use getFlatMapReturnTypes() - FlatMapFunction<?, ?> function = new FlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() { + RichFlatMapFunction<?, ?> function = new RichFlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() { private static final long serialVersionUID = 1L; @Override @@ -247,12 +247,12 @@ public class TypeExtractorTest { @Test public void testSubclassOfTuple() { // use getJoinReturnTypes() - JoinFunction<?, ?, ?> function = new JoinFunction<CustomTuple, String, CustomTuple>() { + RichFlatJoinFunction<?, ?, ?> function = new RichFlatJoinFunction<CustomTuple, String, CustomTuple>() { private static final long serialVersionUID = 1L; @Override - public CustomTuple join(CustomTuple first, String second) throws Exception { - return null; + public void join(CustomTuple first, String second, Collector<CustomTuple> out) throws Exception { + out.collect(null); } }; @@ -295,7 +295,7 @@ public class TypeExtractorTest { @Test public void testCustomType() { // use getCrossReturnTypes() - CrossFunction<?, ?, ?> function = new CrossFunction<CustomType, Integer, CustomType>() { + RichCrossFunction<?, ?, ?> function = new RichCrossFunction<CustomType, Integer, CustomType>() { private static final long serialVersionUID = 1L; @Override @@ -342,7 +342,7 @@ public class TypeExtractorTest { @Test public void testTupleWithCustomType() { // use getMapReturnTypes() - MapFunction<?, ?> function = new MapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() { private static final long serialVersionUID = 1L; @Override @@ -412,7 +412,7 @@ public class TypeExtractorTest { @Test public void testTupleOfValues() { // use getMapReturnTypes() - MapFunction<?, ?> function = new MapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() { private static final long serialVersionUID = 1L; @Override @@ -451,7 +451,7 @@ public class TypeExtractorTest { @Test public void testGenericsNotInSuperclass() { // use getMapReturnTypes() - MapFunction<?, ?> function = new MapFunction<LongKeyValue<String>, LongKeyValue<String>>() { + RichMapFunction<?, ?> function = new RichMapFunction<LongKeyValue<String>, LongKeyValue<String>>() { private static final long serialVersionUID = 1L; @Override @@ -494,7 +494,7 @@ public class TypeExtractorTest { @Test public void testChainedGenericsNotInSuperclass() { // use TypeExtractor - MapFunction<?, ?> function = new MapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() { + RichMapFunction<?, ?> function = new RichMapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() { private static final long serialVersionUID = 1L; @Override @@ -536,7 +536,7 @@ public class TypeExtractorTest { @Test public void testGenericsInDirectSuperclass() { // use TypeExtractor - MapFunction<?, ?> function = new MapFunction<ChainedThree, ChainedThree>() { + RichMapFunction<?, ?> function = new RichMapFunction<ChainedThree, ChainedThree>() { private static final long serialVersionUID = 1L; @Override @@ -562,7 +562,7 @@ public class TypeExtractorTest { @Test public void testGenericsNotInSuperclassWithNonGenericClassAtEnd() { // use TypeExtractor - MapFunction<?, ?> function = new MapFunction<ChainedFour, ChainedFour>() { + RichMapFunction<?, ?> function = new RichMapFunction<ChainedFour, ChainedFour>() { private static final long serialVersionUID = 1L; @Override @@ -587,7 +587,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testMissingTupleGenericsException() { - MapFunction<?, ?> function = new MapFunction<String, Tuple2>() { + RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple2>() { private static final long serialVersionUID = 1L; @Override @@ -607,7 +607,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testTupleSupertype() { - MapFunction<?, ?> function = new MapFunction<String, Tuple>() { + RichMapFunction<?, ?> function = new RichMapFunction<String, Tuple>() { private static final long serialVersionUID = 1L; @Override @@ -635,7 +635,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testSameGenericVariable() { - MapFunction<?, ?> function = new MapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() { + RichMapFunction<?, ?> function = new RichMapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() { private static final long serialVersionUID = 1L; @Override @@ -667,7 +667,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testNestedTupleGenerics() { - MapFunction<?, ?> function = new MapFunction<Nested<String, Integer>, Nested<String, Integer>>() { + RichMapFunction<?, ?> function = new RichMapFunction<Nested<String, Integer>, Nested<String, Integer>>() { private static final long serialVersionUID = 1L; @Override @@ -706,7 +706,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testNestedTupleGenerics2() { - MapFunction<?, ?> function = new MapFunction<Nested2<Boolean>, Nested2<Boolean>>() { + RichMapFunction<?, ?> function = new RichMapFunction<Nested2<Boolean>, Nested2<Boolean>>() { private static final long serialVersionUID = 1L; @Override @@ -746,7 +746,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testFunctionWithMissingGenerics() { - MapFunction function = new MapFunction() { + RichMapFunction function = new RichMapFunction() { private static final long serialVersionUID = 1L; @Override @@ -776,7 +776,7 @@ public class TypeExtractorTest { Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); } - public class IdentityMapper<T> extends MapFunction<T, T> { + public class IdentityMapper<T> extends RichMapFunction<T, T> { private static final long serialVersionUID = 1L; @Override @@ -807,7 +807,7 @@ public class TypeExtractorTest { } } - public class IdentityMapper2<T> extends MapFunction<Tuple2<T, String>, T> { + public class IdentityMapper2<T> extends RichMapFunction<Tuple2<T, String>, T> { private static final long serialVersionUID = 1L; @Override @@ -843,7 +843,7 @@ public class TypeExtractorTest { Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1)); } - public class IdentityMapper3<T, V> extends MapFunction<T, V> { + public class IdentityMapper3<T, V> extends RichMapFunction<T, V> { private static final long serialVersionUID = 1L; @Override @@ -916,7 +916,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testFunctionWithNoGenericSuperclass() { - MapFunction<?, ?> function = new Mapper2(); + RichMapFunction<?, ?> function = new Mapper2(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String")); @@ -924,7 +924,7 @@ public class TypeExtractorTest { Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); } - public class OneAppender<T> extends MapFunction<T, Tuple2<T, Integer>> { + public class OneAppender<T> extends RichMapFunction<T, Tuple2<T, Integer>> { private static final long serialVersionUID = 1L; public Tuple2<T, Integer> map(T value) { @@ -935,7 +935,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testFunctionDependingPartialOnInput() { - MapFunction<?, ?> function = new OneAppender<DoubleValue>() { + RichMapFunction<?, ?> function = new OneAppender<DoubleValue>() { private static final long serialVersionUID = 1L; }; @@ -955,7 +955,7 @@ public class TypeExtractorTest { @Test public void testFunctionDependingPartialOnInput2() { - MapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>(); + RichMapFunction<DoubleValue, ?> function = new OneAppender<DoubleValue>(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new ValueTypeInfo<DoubleValue>(DoubleValue.class)); @@ -971,7 +971,7 @@ public class TypeExtractorTest { Assert.assertEquals(Integer.class , tti.getTypeAt(1).getTypeClass()); } - public class FieldDuplicator<T> extends MapFunction<T, Tuple2<T, T>> { + public class FieldDuplicator<T> extends RichMapFunction<T, Tuple2<T, T>> { private static final long serialVersionUID = 1L; public Tuple2<T, T> map(T value) { @@ -981,7 +981,7 @@ public class TypeExtractorTest { @Test public void testFunctionInputInOutputMultipleTimes() { - MapFunction<Float, ?> function = new FieldDuplicator<Float>(); + RichMapFunction<Float, ?> function = new FieldDuplicator<Float>(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.FLOAT_TYPE_INFO); @@ -994,7 +994,7 @@ public class TypeExtractorTest { @Test public void testFunctionInputInOutputMultipleTimes2() { - MapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>(); + RichMapFunction<Tuple2<Float, Float>, ?> function = new FieldDuplicator<Tuple2<Float, Float>>(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo<Tuple2<Float, Float>>( BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.FLOAT_TYPE_INFO)); @@ -1023,7 +1023,7 @@ public class TypeExtractorTest { @Test public void testAbstractAndInterfaceTypesException() { - MapFunction<String, ?> function = new MapFunction<String, Testable>() { + RichMapFunction<String, ?> function = new RichMapFunction<String, Testable>() { private static final long serialVersionUID = 1L; @Override @@ -1039,7 +1039,7 @@ public class TypeExtractorTest { // good } - MapFunction<String, ?> function2 = new MapFunction<String, AbstractClass>() { + RichMapFunction<String, ?> function2 = new RichMapFunction<String, AbstractClass>() { private static final long serialVersionUID = 1L; @Override @@ -1059,7 +1059,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testValueSupertypeException() { - MapFunction<?, ?> function = new MapFunction<StringValue, Value>() { + RichMapFunction<?, ?> function = new RichMapFunction<StringValue, Value>() { private static final long serialVersionUID = 1L; @Override @@ -1080,7 +1080,7 @@ public class TypeExtractorTest { @Test public void testBasicArray() { // use getCoGroupReturnTypes() - CoGroupFunction<?, ?, ?> function = new CoGroupFunction<String[], String[], String[]>() { + RichCoGroupFunction<?, ?, ?> function = new RichCoGroupFunction<String[], String[], String[]>() { private static final long serialVersionUID = 1L; @Override @@ -1107,7 +1107,7 @@ public class TypeExtractorTest { @Test public void testBasicArray2() { - MapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>(); + RichMapFunction<Boolean[], ?> function = new IdentityMapper<Boolean[]>(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO); @@ -1122,7 +1122,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testCustomArray() { - MapFunction<?, ?> function = new MapFunction<CustomArrayObject[], CustomArrayObject[]>() { + RichMapFunction<?, ?> function = new RichMapFunction<CustomArrayObject[], CustomArrayObject[]>() { private static final long serialVersionUID = 1L; @Override @@ -1140,7 +1140,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testTupleArray() { - MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() { private static final long serialVersionUID = 1L; @Override @@ -1167,7 +1167,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testCustomArrayWithTypeVariable() { - MapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>(); + RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>(); TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]")); @@ -1178,7 +1178,7 @@ public class TypeExtractorTest { Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(0)); } - public class GenericArrayClass<T> extends MapFunction<T[], T[]> { + public class GenericArrayClass<T> extends RichMapFunction<T[], T[]> { private static final long serialVersionUID = 1L; @Override @@ -1207,7 +1207,7 @@ public class TypeExtractorTest { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testParamertizedCustomObject() { - MapFunction<?, ?> function = new MapFunction<MyObject<String>, MyObject<String>>() { + RichMapFunction<?, ?> function = new RichMapFunction<MyObject<String>, MyObject<String>>() { private static final long serialVersionUID = 1L; @Override @@ -1242,7 +1242,7 @@ public class TypeExtractorTest { @Test public void testInputMismatchExceptions() { - MapFunction<?, ?> function = new MapFunction<Tuple2<String, String>, String>() { + RichMapFunction<?, ?> function = new RichMapFunction<Tuple2<String, String>, String>() { private static final long serialVersionUID = 1L; @Override @@ -1265,7 +1265,7 @@ public class TypeExtractorTest { // right } - MapFunction<?, ?> function2 = new MapFunction<StringValue, String>() { + RichMapFunction<?, ?> function2 = new RichMapFunction<StringValue, String>() { private static final long serialVersionUID = 1L; @Override @@ -1281,7 +1281,7 @@ public class TypeExtractorTest { // right } - MapFunction<?, ?> function3 = new MapFunction<Tuple1<Integer>[], String>() { + RichMapFunction<?, ?> function3 = new RichMapFunction<Tuple1<Integer>[], String>() { private static final long serialVersionUID = 1L; @Override @@ -1297,7 +1297,7 @@ public class TypeExtractorTest { // right } - MapFunction<?, ?> function4 = new MapFunction<Writable, String>() { + RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, String>() { private static final long serialVersionUID = 1L; @Override @@ -1314,7 +1314,7 @@ public class TypeExtractorTest { } } - public static class DummyFlatMapFunction<A,B,C,D> extends FlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> { + public static class DummyFlatMapFunction<A,B,C,D> extends RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> { private static final long serialVersionUID = 1L; @Override @@ -1336,7 +1336,7 @@ public class TypeExtractorTest { } } - public static class MyQueryableMapper<A> extends MapFunction<String, A> implements ResultTypeQueryable<A> { + public static class MyQueryableMapper<A> extends RichMapFunction<String, A> implements ResultTypeQueryable<A> { private static final long serialVersionUID = 1L; @SuppressWarnings("unchecked") @@ -1359,7 +1359,7 @@ public class TypeExtractorTest { @Test public void testTupleWithPrimitiveArray() { - MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new MapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() { + RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>> function = new RichMapFunction<Integer, Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], String[]>>() { private static final long serialVersionUID = 1L; @Override @@ -1382,8 +1382,8 @@ public class TypeExtractorTest { } @Test - public void testInterface() { - GenericMap<String, Boolean> mapInterface = new GenericMap<String, Boolean>() { + public void testFunction() { + RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() { @Override public void setRuntimeContext(RuntimeContext t) { @@ -1392,7 +1392,6 @@ public class TypeExtractorTest { @Override public void open(Configuration parameters) throws Exception { - } @Override @@ -1414,4 +1413,17 @@ public class TypeExtractorTest { TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO); Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); } + + @Test + public void testInterface() { + MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() { + @Override + public Boolean map(String record) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java8-tests/pom.xml b/flink-java8-tests/pom.xml new file mode 100644 index 0000000..2587776 --- /dev/null +++ b/flink-java8-tests/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>0.6-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-java8-tests</artifactId> + <name>flink-java8-tests</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.7</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <!-- just define the Java version to be used for compiling and plugins --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version><!--$NO-MVN-MAN-VER$--> + <configuration> + <source>1.8</source> + <target>1.8</target> + <!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> --> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <log.level>WARN</log.level> + </systemPropertyVariables> + <forkMode>once</forkMode> + <argLine>-Xmx1024m</argLine> + </configuration> + </plugin> + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <log.level>WARN</log.level> + </systemPropertyVariables> + <forkMode>always</forkMode> + <threadCount>1</threadCount> + <perCoreThreadCount>false</perCoreThreadCount> + </configuration> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId> + org.apache.maven.plugins + </groupId> + <artifactId> + maven-assembly-plugin + </artifactId> + <versionRange> + [2.4,) + </versionRange> + <goals> + <goal>single</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java new file mode 100644 index 0000000..c417249 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class CoGroupITCase implements Serializable { + + @Test + public void testCoGroupLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, String>> left = env.fromElements( + new Tuple2<Integer, String>(1, "hello"), + new Tuple2<Integer, String>(2, "what's"), + new Tuple2<Integer, String>(2, "up") + ); + DataSet<Tuple2<Integer, String>> right = env.fromElements( + new Tuple2<Integer, String>(1, "not"), + new Tuple2<Integer, String>(1, "much"), + new Tuple2<Integer, String>(2, "really") + ); + DataSet<Tuple2<Integer,String>> joined = left.coGroup(right).where(0).equalTo(0) + .with((values1, values2, out) -> { + int sum = 0; + String conc = ""; + while (values1.hasNext()) { + sum += values1.next().f0; + conc += values1.next().f1; + } + while (values2.hasNext()) { + sum += values2.next().f0; + conc += values2.next().f1; + } + }); + env.execute(); + + + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java new file mode 100644 index 0000000..f8d217e --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class CrossITCase implements Serializable { + + @Test + public void testCrossLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, String>> left = env.fromElements( + new Tuple2<Integer, String>(1, "hello"), + new Tuple2<Integer, String>(2, "what's"), + new Tuple2<Integer, String>(2, "up") + ); + DataSet<Tuple2<Integer, String>> right = env.fromElements( + new Tuple2<Integer, String>(1, "not"), + new Tuple2<Integer, String>(1, "much"), + new Tuple2<Integer, String>(2, "really") + ); + DataSet<Tuple2<Integer,String>> joined = left.cross(right) + .with((t,s) -> new Tuple2<Integer, String> (t.f0 + s.f0, t.f1 + " " + s.f1)); + + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java new file mode 100644 index 0000000..c775425 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +@RunWith(Parameterized.class) +public class FilterITCase extends JavaProgramTestBase { + + + public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) { + + List<Tuple3<Integer, Long, String>> data = new ArrayList<Tuple3<Integer, Long, String>>(); + data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi")); + data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello")); + data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world")); + data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, how are you?")); + data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine.")); + data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke Skywalker")); + data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1")); + data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2")); + data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3")); + data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4")); + data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5")); + data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6")); + data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7")); + data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8")); + data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9")); + data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10")); + data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11")); + data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12")); + data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13")); + data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14")); + data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15")); + + Collections.shuffle(data); + + return env.fromCollection(data); + } + + private static int NUM_PROGRAMS = 1; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public FilterITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = FilterProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class FilterProgs { + + public static String runProgram(int progId, String resultPath) throws Exception { + + switch(progId) { + case 1: { + /* + * Test lambda filter + * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3 + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(value -> value.f2.contains("world")); + filterDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + + } + + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java new file mode 100644 index 0000000..043b4e8 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class FlatJoinITCase implements Serializable { + + @Test + public void testFlatJoinLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, String>> left = env.fromElements( + new Tuple2<Integer, String>(1, "hello"), + new Tuple2<Integer, String>(2, "what's"), + new Tuple2<Integer, String>(2, "up") + ); + DataSet<Tuple2<Integer, String>> right = env.fromElements( + new Tuple2<Integer, String>(1, "not"), + new Tuple2<Integer, String>(1, "much"), + new Tuple2<Integer, String>(2, "really") + ); + DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0) + .with((t,s,out) -> out.collect(new Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1))); + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java new file mode 100644 index 0000000..55f507c --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class FlatMapITCase implements Serializable { + + @Test + public void testFlatMapLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad"); + DataSet<String> flatMappedDs = stringDs.flatMap((s, out) -> out.collect(s.replace("a", "b"))); + env.execute(); + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java new file mode 100644 index 0000000..494aff6 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class GroupReduceITCase implements Serializable { + + @Test + public void testAllGroupReduceLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad"); + DataSet<String> concatDs = stringDs.reduceGroup((values, out) -> { + String conc = ""; + while (values.hasNext()) { + String s = values.next(); + conc = conc.concat(s); + } + out.collect(conc); + }); + env.execute(); + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGroupReduceLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer,String>> stringDs = env.fromElements( + new Tuple2<Integer,String>(1, "aa"), + new Tuple2<Integer,String>(2, "ab"), + new Tuple2<Integer,String>(1, "ac"), + new Tuple2<Integer,String>(2, "ad") + ); + DataSet<String> concatDs = stringDs + .groupBy(0) + .reduceGroup((values, out) -> { + String conc = ""; + while (values.hasNext()) { + String s = values.next().f1; + conc = conc.concat(s); + } + out.collect(conc); + }); + env.execute(); + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java new file mode 100644 index 0000000..3f4f696 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class JoinITCase implements Serializable { + + @Test + public void testJoinLambda() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Integer, String>> left = env.fromElements( + new Tuple2<Integer, String>(1, "hello"), + new Tuple2<Integer, String>(2, "what's"), + new Tuple2<Integer, String>(2, "up") + ); + DataSet<Tuple2<Integer, String>> right = env.fromElements( + new Tuple2<Integer, String>(1, "not"), + new Tuple2<Integer, String>(1, "much"), + new Tuple2<Integer, String>(2, "really") + ); + DataSet<Tuple2<Integer,String>> joined = left.join(right).where(0).equalTo(0) + .with((t,s) -> new Tuple2<Integer,String>(t.f0, t.f1 + " " + t.f1)); + + } catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } catch (Exception e) { + Assert.fail(); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java new file mode 100644 index 0000000..3af360b --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; + +public class MapITCase implements Serializable{ + + @Test + public void TestMapLambda () { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> stringDs = env.fromElements("aa", "ab", "ac", "ad"); + DataSet<String> mappedDs = stringDs.map (s -> s.replace("a", "b")); + env.execute(); + } + catch (UnsupportedLambdaExpressionException e) { + // Success + return; + } + catch (Exception e) { + Assert.fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java new file mode 100644 index 0000000..ab27fe4 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +@RunWith(Parameterized.class) +public class ReduceITCase extends JavaProgramTestBase { + + public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) { + + List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>(); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(1,1l,0,"Hallo",1l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,2l,1,"Hallo Welt",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(2,3l,2,"Hallo Welt wie",1l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,4l,3,"Hallo Welt wie gehts?",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,5l,4,"ABC",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(3,6l,5,"BCD",3l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,7l,6,"CDE",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,8l,7,"DEF",1l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,9l,8,"EFG",1l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(4,10l,9,"FGH",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,11l,10,"GHI",1l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,12l,11,"HIJ",3l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,13l,12,"IJK",3l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,14l,13,"JKL",2l)); + data.add(new Tuple5<Integer, Long, Integer, String, Long>(5,15l,14,"KLM",2l)); + + Collections.shuffle(data); + + TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new + TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + + private static int NUM_PROGRAMS = 1; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public ReduceITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = ReduceProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class ReduceProgs { + + public static String runProgram(int progId, String resultPath) throws Exception { + + switch(progId) { + case 1: { + /* + * Test reduce with lambda + * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2 + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds + .groupBy(4, 0) + .reduce((in1, in2) -> { + Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); + out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); + return out; + }); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 636c492..34cd232 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.LongSumAggregator; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -54,7 +54,7 @@ import java.io.IOException; /** * The base class for all tasks able to participate in an iteration. */ -public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT> +public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT> implements Terminable { private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java index d7f3b50..7a77cff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -75,7 +75,7 @@ import org.apache.flink.util.MutableObjectIterator; * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the * same as {@code X} */ -public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> { +public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> { private static final Log log = LogFactory.getLog(IterationHeadPactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java index 25a6149..2a8325c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.runtime.io.network.api.BufferWriter; import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent; import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; @@ -41,7 +41,7 @@ import org.apache.flink.util.Collector; * a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case * this task must be scheduled on the same instance as the head. */ -public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> { +public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> { private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java index 570630f..942e2f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; @@ -38,7 +38,7 @@ import org.apache.flink.util.Collector; * <p/> * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish. */ -public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> +public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> implements PactTaskContext<S, OT> { private static final Log log = LogFactory.getLog(IterationTailPactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java index fe70171..d7d63af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -30,7 +30,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> { +public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator; @@ -110,7 +110,7 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M @Override public void run() throws Exception { - final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub(); + final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); if (buildSideIndex == 0) {
