http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java index 2f749d4..4c8177a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.aggregators.LongSumAggregator; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; @@ -147,8 +147,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP } } - public static final class NeighborWithComponentIDJoin extends JoinFunction - <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -161,8 +160,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP } } - public static final class MinimumReduce extends GroupReduceFunction - <Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); @@ -189,8 +187,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP } @SuppressWarnings("serial") - public static final class MinimumIdFilter extends FlatMapFunction - <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { + public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { private static LongSumAggregatorWithParameter aggr;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java index fa1676f..104c3df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java @@ -25,9 +25,9 @@ import java.util.List; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; import org.apache.flink.api.common.aggregators.LongSumAggregator; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; @@ -139,8 +139,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java } } - public static final class NeighborWithComponentIDJoin extends JoinFunction - <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; @@ -153,8 +152,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java } } - public static final class MinimumReduce extends GroupReduceFunction - <Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private static final long serialVersionUID = 1L; final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>(); @@ -181,8 +179,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java } @SuppressWarnings("serial") - public static final class MinimumIdFilter extends FlatMapFunction - <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { + public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { private static LongSumAggregator aggr; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java index 0475a4f..1ec0eb4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java @@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Iterator; import java.util.Set; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.iterative.nephele.ConfigUtils; import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank; @@ -32,7 +32,7 @@ import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats; import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator; import org.apache.flink.util.Collector; -public class CustomCompensatableDotProductCoGroup extends AbstractFunction implements GenericCoGrouper<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> { +public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java index 28c77ba..b44d914 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java @@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Random; import java.util.Set; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.iterative.nephele.ConfigUtils; import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList; @@ -31,8 +31,8 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling; import org.apache.flink.util.Collector; -public class CustomCompensatableDotProductMatch extends AbstractFunction implements - GenericJoiner<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank> +public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements + FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java index 74426c0..d83b33b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java @@ -21,7 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Set; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.iterative.nephele.ConfigUtils; @@ -29,7 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats; import org.apache.flink.util.Collector; -public class CustomCompensatingMap extends AbstractFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> { +public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java index 8af9247..1e08a9f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java @@ -20,15 +20,15 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import java.util.Iterator; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank; import org.apache.flink.util.Collector; -public class CustomRankCombiner extends AbstractFunction implements GenericGroupReduce<VertexWithRank, VertexWithRank>, - GenericCombine<VertexWithRank> +public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>, + FlatCombineFunction<VertexWithRank> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index b914c1c..3749c1d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -24,8 +24,9 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; -import org.apache.flink.api.java.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.RichCoGroupFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -301,7 +302,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } - public static class Tuple5CoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> { + public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; @@ -330,7 +331,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } } - public static class CustomTypeCoGroup extends CoGroupFunction<CustomType, CustomType, CustomType> { + public static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> { private static final long serialVersionUID = 1L; @@ -358,7 +359,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } - public static class MixedCoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { + public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -388,7 +389,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } - public static class MixedCoGroup2 extends CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> { + public static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> { private static final long serialVersionUID = 1L; @@ -417,7 +418,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } - public static class Tuple3ReturnLeft extends CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + public static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -434,7 +435,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } } - public static class Tuple5ReturnRight extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + public static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { private static final long serialVersionUID = 1L; @@ -456,7 +457,7 @@ public class CoGroupITCase extends JavaProgramTestBase { } - public static class Tuple5CoGroupBC extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { + public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java index dabe7fc..304dda2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java @@ -23,7 +23,8 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import org.apache.flink.api.java.functions.CrossFunction; +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.java.functions.RichCrossFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -400,7 +401,7 @@ public class CrossITCase extends JavaProgramTestBase { } - public static class Tuple5Cross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> { + public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; @@ -416,7 +417,7 @@ public class CrossITCase extends JavaProgramTestBase { } - public static class CustomTypeCross extends CrossFunction<CustomType, CustomType, CustomType> { + public static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> { private static final long serialVersionUID = 1L; @@ -429,7 +430,7 @@ public class CrossITCase extends JavaProgramTestBase { } - public static class MixedCross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { + public static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -444,7 +445,7 @@ public class CrossITCase extends JavaProgramTestBase { } - public static class Tuple3ReturnLeft extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { + public static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -457,7 +458,7 @@ public class CrossITCase extends JavaProgramTestBase { } } - public static class Tuple5ReturnRight extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + public static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { private static final long serialVersionUID = 1L; @@ -473,7 +474,7 @@ public class CrossITCase extends JavaProgramTestBase { } - public static class Tuple5CrossBC extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { + public static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java index 203117c..0c6f3cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java @@ -23,7 +23,7 @@ import java.util.Collection; import java.util.LinkedList; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -164,7 +164,7 @@ public class DistinctITCase extends JavaProgramTestBase { return in.myInt; } }) - .map(new MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() { + .map(new RichMapFunction<CustomType, Tuple1<Integer>>() { @Override public Tuple1<Integer> map(CustomType value) throws Exception { return new Tuple1<Integer>(value.myInt); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java index 96174da..6613bc1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java @@ -23,7 +23,8 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import org.apache.flink.api.java.functions.FilterFunction; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.functions.RichFilterFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; @@ -38,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) public class FilterITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 8; + private static int NUM_PROGRAMS = 8; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -268,7 +269,7 @@ public class FilterITCase extends JavaProgramTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. - filter(new FilterFunction<Tuple3<Integer,Long,String>>() { + filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; int literal = -1; @@ -306,7 +307,7 @@ public class FilterITCase extends JavaProgramTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> filterDs = ds. - filter(new FilterFunction<Tuple3<Integer,Long,String>>() { + filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; private int broadcastSum = 0; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java index 1c97347..a6dd377 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java @@ -23,7 +23,8 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import org.apache.flink.api.java.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; @@ -323,7 +324,7 @@ public class FlatMapITCase extends JavaProgramTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds. - flatMap(new FlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { + flatMap(new RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> outTuple = new Tuple3<Integer, Long, String>(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 6556b5e..7376e86 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -24,10 +24,11 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.GroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -89,332 +90,336 @@ public class GroupReduceITCase extends JavaProgramTestBase { private static class GroupReduceProgs { public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { + + switch (progId) { + case 1: { /* * check correctness of groupReduce on tuples with key field selector */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<Integer, Long>> reduceDs = ds. - groupBy(1).reduceGroup(new Tuple3GroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1\n" + - "5,2\n" + - "15,3\n" + - "34,4\n" + - "65,5\n" + - "111,6\n"; - } - case 2: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> reduceDs = ds. + groupBy(1).reduceGroup(new Tuple3GroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1\n" + + "5,2\n" + + "15,3\n" + + "34,4\n" + + "65,5\n" + + "111,6\n"; + } + case 2: { /* * check correctness of groupReduce on tuples with multiple key field selector */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. - groupBy(4,0).reduceGroup(new Tuple5GroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,0,P-),1\n" + - "2,3,0,P-),1\n" + - "2,2,0,P-),2\n" + - "3,9,0,P-),2\n" + - "3,6,0,P-),3\n" + - "4,17,0,P-),1\n" + - "4,17,0,P-),2\n" + - "5,11,0,P-),1\n" + - "5,29,0,P-),2\n" + - "5,25,0,P-),3\n"; - } - case 3: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. + groupBy(4, 0).reduceGroup(new Tuple5GroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,0,P-),1\n" + + "2,3,0,P-),1\n" + + "2,2,0,P-),2\n" + + "3,9,0,P-),2\n" + + "3,6,0,P-),3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,0,P-),1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + } + case 3: { /* * check correctness of groupReduce on tuples with key field selector and group sorting */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(1); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. - groupBy(1).sortGroup(2,Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,Hi\n" + - "5,2,Hello-Hello world\n" + - "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" + - "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" + - "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" + - "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n"; - - } - case 4: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).sortGroup(2, Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,Hi\n" + + "5,2,Hello-Hello world\n" + + "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" + + "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" + + "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" + + "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n"; + + } + case 4: { /* * check correctness of groupReduce on tuples with key extractor */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<Integer, Long>> reduceDs = ds. - groupBy(new KeySelector<Tuple3<Integer,Long,String>, Long>() { - private static final long serialVersionUID = 1L; - @Override - public Long getKey(Tuple3<Integer, Long, String> in) { - return in.f1; - } - }).reduceGroup(new Tuple3GroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1\n" + - "5,2\n" + - "15,3\n" + - "34,4\n" + - "65,5\n" + - "111,6\n"; - - } - case 5: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> reduceDs = ds. + groupBy(new KeySelector<Tuple3<Integer, Long, String>, Long>() { + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(Tuple3<Integer, Long, String> in) { + return in.f1; + } + }).reduceGroup(new Tuple3GroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1\n" + + "5,2\n" + + "15,3\n" + + "34,4\n" + + "65,5\n" + + "111,6\n"; + + } + case 5: { /* * check correctness of groupReduce on custom type with type extractor */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> reduceDs = ds. - groupBy(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).reduceGroup(new CustomTypeGroupReduce()); - - reduceDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "1,0,Hello!\n" + - "2,3,Hello!\n" + - "3,12,Hello!\n" + - "4,30,Hello!\n" + - "5,60,Hello!\n" + - "6,105,Hello!\n"; - } - case 6: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> reduceDs = ds. + groupBy(new KeySelector<CustomType, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + }).reduceGroup(new CustomTypeGroupReduce()); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1,0,Hello!\n" + + "2,3,Hello!\n" + + "3,12,Hello!\n" + + "4,30,Hello!\n" + + "5,60,Hello!\n" + + "6,105,Hello!\n"; + } + case 6: { /* * check correctness of all-groupreduce for tuples */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "231,91,Hello World\n"; - } - case 7: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "231,91,Hello World\n"; + } + case 7: { /* * check correctness of all-groupreduce for custom types */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce()); - - reduceDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "91,210,Hello!"; - } - case 8: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce()); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "91,210,Hello!"; + } + case 8: { /* * check correctness of groupReduce with broadcast set */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. - groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints"); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,55\n" + - "5,2,55\n" + - "15,3,55\n" + - "34,4,55\n" + - "65,5,55\n" + - "111,6,55\n"; - } - case 9: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints"); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,55\n" + + "5,2,55\n" + + "15,3,55\n" + + "34,4,55\n" + + "65,5,55\n" + + "111,6,55\n"; + } + case 9: { /* * check correctness of groupReduce if UDF returns input objects multiple times and changes it in between */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. - groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "11,1,Hi!\n" + - "21,1,Hi again!\n" + - "12,2,Hi!\n" + - "22,2,Hi again!\n" + - "13,2,Hi!\n" + - "23,2,Hi again!\n"; - } - case 10: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "11,1,Hi!\n" + + "21,1,Hi again!\n" + + "12,2,Hi!\n" + + "22,2,Hi again!\n" + + "13,2,Hi!\n" + + "23,2,Hi again!\n"; + } + case 10: { /* * check correctness of groupReduce on custom type with key extractor and combine */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); - DataSet<CustomType> reduceDs = ds. - groupBy(new KeySelector<CustomType, Integer>() { - private static final long serialVersionUID = 1L; - @Override - public Integer getKey(CustomType in) { - return in.myInt; - } - }).reduceGroup(new CustomTypeGroupReduceWithCombine()); - - reduceDs.writeAsText(resultPath); - env.execute(); - - // return expected result - return "1,0,test1\n" + - "2,3,test2\n" + - "3,12,test3\n" + - "4,30,test4\n" + - "5,60,test5\n" + - "6,105,test6\n"; - } - case 11: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> reduceDs = ds. + groupBy(new KeySelector<CustomType, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + }).reduceGroup(new CustomTypeGroupReduceWithCombine()); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1,0,test1\n" + + "2,3,test2\n" + + "3,12,test3\n" + + "4,30,test4\n" + + "5,60,test5\n" + + "6,105,test6\n"; + } + case 11: { /* * check correctness of groupReduce on tuples with combine */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<Integer, String>> reduceDs = ds. - groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,test1\n" + - "5,test2\n" + - "15,test3\n" + - "34,test4\n" + - "65,test5\n" + - "111,test6\n"; - } - // all-groupreduce with combine - case 12: { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, String>> reduceDs = ds. + groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,test1\n" + + "5,test2\n" + + "15,test3\n" + + "34,test4\n" + + "65,test5\n" + + "111,test6\n"; + } + // all-groupreduce with combine + case 12: { /* * check correctness of all-groupreduce for tuples with combine */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env) - .map(new IdentityMapper<Tuple3<Integer,Long,String>>()).setParallelism(4); - - Configuration cfg = new Configuration(); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION); - DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine()) - .withParameters(cfg); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n"; - } - // descending sort not working - case 13: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env) + .map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4); + + Configuration cfg = new Configuration(); + cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION); + DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine()) + .withParameters(cfg); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n"; + } + // descending sort not working + case 13: { /* * check correctness of groupReduce on tuples with key field selector and group sorting */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setDegreeOfParallelism(1); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. - groupBy(1).sortGroup(2,Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce()); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,Hi\n" + - "5,2,Hello world-Hello\n" + - "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" + - "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" + - "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" + - "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"; - - } - default: - throw new IllegalArgumentException("Invalid program id"); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> reduceDs = ds. + groupBy(1).sortGroup(2, Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,Hi\n" + + "5,2,Hello world-Hello\n" + + "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" + + "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" + + "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" + + "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n"; + + } + default: { + throw new IllegalArgumentException("Invalid program id"); + } } } } - public static class Tuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { + public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { private static final long serialVersionUID = 1L; @@ -436,7 +441,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class Tuple3SortedGroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + public static class Tuple3SortedGroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @@ -462,7 +467,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class Tuple5GroupReduce extends GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + public static class Tuple5GroupReduce implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { private static final long serialVersionUID = 1L; @Override @@ -486,7 +491,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class CustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> { + public static class CustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> { private static final long serialVersionUID = 1L; @@ -511,8 +516,9 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - - public static class InputReturningTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + + public static class InputReturningTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override @@ -534,7 +540,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class AllAddingTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + public static class AllAddingTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override @@ -554,7 +560,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class AllAddingCustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> { + public static class AllAddingCustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> { private static final long serialVersionUID = 1L; @Override @@ -579,7 +585,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static class BCTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> { + public static class BCTuple3GroupReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private String f2Replace = ""; @@ -613,8 +619,8 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable - public static class Tuple3GroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { + @RichGroupReduceFunction.Combinable + public static class Tuple3GroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; @Override @@ -650,8 +656,8 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable - public static class Tuple3AllGroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { + @RichGroupReduceFunction.Combinable + public static class Tuple3AllGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { private static final long serialVersionUID = 1L; @Override @@ -686,8 +692,8 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable - public static class CustomTypeGroupReduceWithCombine extends GroupReduceFunction<CustomType, CustomType> { + @RichGroupReduceFunction.Combinable + public static class CustomTypeGroupReduceWithCombine extends RichGroupReduceFunction<CustomType, CustomType> { private static final long serialVersionUID = 1L; @Override @@ -723,7 +729,7 @@ public class GroupReduceITCase extends JavaProgramTestBase { } } - public static final class IdentityMapper<T> extends MapFunction<T, T> { + public static final class IdentityMapper<T> extends RichMapFunction<T, T> { @Override public T map(T value) { return value; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index ffad949..a293cbf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.RichFlatJoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; @@ -33,6 +35,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -101,7 +104,7 @@ public class JoinITCase extends JavaProgramTestBase { ds1.join(ds2) .where(1) .equalTo(1) - .with(new T3T5Join()); + .with(new T3T5FlatJoin()); joinDs.writeAsCsv(resultPath); env.execute(); @@ -126,7 +129,7 @@ public class JoinITCase extends JavaProgramTestBase { ds1.join(ds2) .where(0,1) .equalTo(0,4) - .with(new T3T5Join()); + .with(new T3T5FlatJoin()); joinDs.writeAsCsv(resultPath); env.execute(); @@ -177,7 +180,7 @@ public class JoinITCase extends JavaProgramTestBase { DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2) .where(1) .equalTo(1) - .with(new T3T5Join()); + .with(new T3T5FlatJoin()); joinDs.writeAsCsv(resultPath); env.execute(); @@ -202,7 +205,7 @@ public class JoinITCase extends JavaProgramTestBase { ds1.joinWithTiny(ds2) .where(1) .equalTo(1) - .with(new T3T5Join()); + .with(new T3T5FlatJoin()); joinDs.writeAsCsv(resultPath); env.execute(); @@ -292,35 +295,35 @@ public class JoinITCase extends JavaProgramTestBase { } case 9: { - /* - * Join on a tuple input with key field selector and a custom type input with key extractor - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + /* + * Join on a tuple input with key field selector and a custom type input with key extractor + */ - DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env); - DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); - DataSet<Tuple2<String, String>> joinDs = - ds1.join(ds2) - .where(new KeySelector<CustomType, Integer>() { - @Override - public Integer getKey(CustomType value) { - return value.myInt; - } - } - ) - .equalTo(0) - .with(new CustT3Join()); - - joinDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "Hi,Hi\n" + - "Hello,Hello\n" + - "Hello world,Hello\n"; - - } + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<String, String>> joinDs = + ds1.join(ds2) + .where(new KeySelector<CustomType, Integer>() { + @Override + public Integer getKey(CustomType value) { + return value.myInt; + } + } + ) + .equalTo(0) + .with(new CustT3Join()); + + joinDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "Hi,Hi\n" + + "Hello,Hello\n" + + "Hello world,Hello\n"; + + } case 10: { /* @@ -458,38 +461,39 @@ public class JoinITCase extends JavaProgramTestBase { } - public static class T3T5Join extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> { + public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> { @Override - public Tuple2<String, String> join(Tuple3<Integer, Long, String> first, - Tuple5<Integer, Long, Integer, String, Long> second) { - - return new Tuple2<String,String>(first.f2, second.f3); + public void join(Tuple3<Integer, Long, String> first, + Tuple5<Integer, Long, Integer, String, Long> second, + Collector<Tuple2<String,String>> out) { + + out.collect (new Tuple2<String,String> (first.f2, second.f3)); } - + } - public static class LeftReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { + public static class LeftReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { @Override public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> first, - Tuple5<Integer, Long, Integer, String, Long> second) { + Tuple5<Integer, Long, Integer, String, Long> second) { return first; } } - public static class RightReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + public static class RightReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { @Override public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> first, - Tuple5<Integer, Long, Integer, String, Long> second) { + Tuple5<Integer, Long, Integer, String, Long> second) { return second; } } - public static class T3T5BCJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> { + public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> { private int broadcast; @@ -505,6 +509,7 @@ public class JoinITCase extends JavaProgramTestBase { } + /* @Override public Tuple3<String, String, Integer> join( Tuple3<Integer, Long, String> first, @@ -512,19 +517,25 @@ public class JoinITCase extends JavaProgramTestBase { return new Tuple3<String, String, Integer>(first.f2, second.f3, broadcast); } + */ + + @Override + public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, String, Integer>> out) throws Exception { + out.collect(new Tuple3<String, String, Integer> (first.f2, second.f3, broadcast)); + } } - public static class T3CustJoin extends JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> { + public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> { @Override public Tuple2<String, String> join(Tuple3<Integer, Long, String> first, - CustomType second) { + CustomType second) { return new Tuple2<String, String>(first.f2, second.myString); } } - public static class CustT3Join extends JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> { + public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> { @Override public Tuple2<String, String> join(CustomType first, Tuple3<Integer, Long, String> second) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java index 0921e82..4f1fb1a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java @@ -25,7 +25,8 @@ import java.util.LinkedList; import org.junit.Assert; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; @@ -396,7 +397,7 @@ public class MapITCase extends JavaProgramTestBase { DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds. - map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { + map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); private Integer f2Replace = 0; @@ -457,7 +458,7 @@ public class MapITCase extends JavaProgramTestBase { final int testValue = 666; conf.setInteger(testKey, testValue); DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds. - map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { + map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java index 6cc1061..a296a09 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java @@ -23,8 +23,9 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.Configuration; @@ -270,7 +271,7 @@ public class ReduceITCase extends JavaProgramTestBase { "65,5,Hi again!\n" + "111,6,Hi again!\n"; } - default: + default: throw new IllegalArgumentException("Invalid program id"); } @@ -278,7 +279,7 @@ public class ReduceITCase extends JavaProgramTestBase { } - public static class Tuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> { + public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); private final String f2Replace; @@ -306,7 +307,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class Tuple5Reduce extends ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> { + public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> { private static final long serialVersionUID = 1L; private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); @@ -321,7 +322,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class CustomTypeReduce extends ReduceFunction<CustomType> { + public static class CustomTypeReduce implements ReduceFunction<CustomType> { private static final long serialVersionUID = 1L; private final CustomType out = new CustomType(); @@ -336,7 +337,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class InputReturningTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> { + public static class InputReturningTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override @@ -350,7 +351,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class AllAddingTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> { + public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); @@ -364,7 +365,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class AllAddingCustomTypeReduce extends ReduceFunction<CustomType> { + public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> { private static final long serialVersionUID = 1L; private final CustomType out = new CustomType(); @@ -379,7 +380,7 @@ public class ReduceITCase extends JavaProgramTestBase { } } - public static class BCTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> { + public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>(); private String f2Replace = ""; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java index 09191cc..a636ba4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.Collection; import java.util.LinkedList; -import org.apache.flink.api.java.functions.FilterFunction; +import org.apache.flink.api.java.functions.RichFilterFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; @@ -145,7 +145,7 @@ public class UnionITCase extends JavaProgramTestBase { // Don't know how to make an empty result in an other way than filtering it DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env). - filter(new FilterFunction<Tuple3<Integer,Long,String>>() { + filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java index ed573be..aaad08c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; import org.apache.flink.util.Collector; +import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -49,6 +50,7 @@ import java.util.LinkedList; /** */ @RunWith(Parameterized.class) +//@Ignore("Test needs to be adapted to new cross signature") public class CrossITCase extends RecordAPITestBase { private static final Log LOG = LogFactory.getLog(CrossITCase.class); @@ -61,13 +63,30 @@ public class CrossITCase extends RecordAPITestBase { super(testConfig); } - private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n"; + //private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n"; - private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n"; + //private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n"; - private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n" - + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n" - + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n"; + //private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n" + // + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n" + // + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n"; + + //private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 1\n4 2\n4 2\n5 0\n5 0\n5 1\n," + + // "5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" + + // "7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" + + // "8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n"; + + //private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" + + // "7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" + + // "6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" + + // "8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" + + // "6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6"; + + + private static final String LEFT_IN = "1 1\n2 2\n3 3\n"; + private static final String RIGHT_IN = "3 6\n4 4\n4 8\n"; + + private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 4\n9 2\n9 6\n"; @Override protected void preSubmit() throws Exception { @@ -84,7 +103,7 @@ public class CrossITCase extends RecordAPITestBase { private IntValue integer = new IntValue(); @Override - public void cross(Record record1, Record record2, Collector<Record> out) { + public Record cross(Record record1, Record record2) throws Exception { string = record1.getField(1, string); int val1 = Integer.parseInt(string.toString()); string = record2.getField(1, string); @@ -95,16 +114,14 @@ public class CrossITCase extends RecordAPITestBase { int key2 = Integer.parseInt(string.toString()); LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }"); - - if (val1 + val2 <= 6) { - string.setValue((key1 + key2 + 2) + ""); - integer.setValue(val2 - val1 + 1); - - record1.setField(0, string); - record1.setField(1, integer); - - out.collect(record1); - } + + string.setValue((key1 + key2 + 2) + ""); + integer.setValue(val2 - val1 + 1); + + record1.setField(0, string); + record1.setField(1, integer); + + return record1; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java index a545e05..0f58d18 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java @@ -47,19 +47,19 @@ public class ComputeDistance extends CrossFunction implements Serializable { * 3: distance */ @Override - public void cross(Record dataPointRecord, Record clusterCenterRecord, Collector<Record> out) { - + public Record cross(Record dataPointRecord, Record clusterCenterRecord) throws Exception { + CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class); - + IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class); CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class); - + this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint)); - - // add cluster center id and distance to the data point record + + // add cluster center id and distance to the data point record dataPointRecord.setField(2, clusterCenterId); dataPointRecord.setField(3, this.distance); - - out.collect(dataPointRecord); + + return dataPointRecord; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java index 15640c0..441dc39 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java @@ -89,7 +89,7 @@ public class TPCHQuery3 implements Program, ProgramDescription { /** * Reads the filter literals from the configuration. * - * @see org.apache.flink.api.common.functions.Function#open(org.apache.flink.configuration.Configuration) + * @see org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration) */ @Override public void open(Configuration parameters) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java index 97367f7..7149cd3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java @@ -24,8 +24,8 @@ import java.util.Collection; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -170,7 +170,7 @@ public class KMeansForTest implements Program { // ************************************************************************* /** Converts a Tuple2<Double,Double> into a Point. */ - public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> { + public static final class TuplePointConverter extends RichMapFunction<Tuple2<Double, Double>, Point> { @Override public Point map(Tuple2<Double, Double> t) throws Exception { @@ -179,7 +179,7 @@ public class KMeansForTest implements Program { } /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */ - public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> { + public static final class TupleCentroidConverter extends RichMapFunction<Tuple3<Integer, Double, Double>, Centroid> { @Override public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception { @@ -188,7 +188,7 @@ public class KMeansForTest implements Program { } /** Determines the closest cluster center for a data point. */ - public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> { + public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> { private Collection<Centroid> centroids; /** Reads the centroid values from a broadcast variable into a collection. */ @@ -236,7 +236,7 @@ public class KMeansForTest implements Program { } /** Appends a count variable to the tuple. */ - public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> { + public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> { @Override public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) { @@ -245,7 +245,7 @@ public class KMeansForTest implements Program { } /** Sums and counts point coordinates. */ - public static final class CentroidAccumulator extends ReduceFunction<DummyTuple3IntPointLong> { + public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong> { @Override public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) { @@ -254,7 +254,7 @@ public class KMeansForTest implements Program { } /** Computes new centroid from coordinate sum and count of points. */ - public static final class CentroidAverager extends MapFunction<DummyTuple3IntPointLong, Centroid> { + public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong, Centroid> { @Override public Centroid map(DummyTuple3IntPointLong value) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7e0f153..8058539 100644 --- a/pom.xml +++ b/pom.xml @@ -274,6 +274,9 @@ under the License. <activeByDefault>false</activeByDefault> <jdk>1.8</jdk> </activation> + <modules> + <module>flink-java8-tests</module> + </modules> <build> <plugins> <plugin>
