http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java index 265ce75..c0ea26a 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java @@ -22,12 +22,12 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; @@ -134,7 +134,7 @@ public class EnumTrianglesOpt { // ************************************************************************* /** Converts a Tuple2 into an Edge */ - public static class TupleEdgeConverter extends MapFunction<Tuple2<Integer, Integer>, Edge> { + public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { private final Edge outEdge = new Edge(); @Override @@ -145,7 +145,7 @@ public class EnumTrianglesOpt { } /** Emits for an edge the original edge and its switched version. */ - private static class EdgeDuplicator extends FlatMapFunction<Edge, Edge> { + private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> { @Override public void flatMap(Edge edge, Collector<Edge> out) throws Exception { @@ -160,7 +160,7 @@ public class EnumTrianglesOpt { * Emits one edge for each input edge with a degree annotation for the shared vertex. * For each emitted edge, the first vertex is the vertex with the smaller id. */ - private static class DegreeCounter extends GroupReduceFunction<Edge, EdgeWithDegrees> { + private static class DegreeCounter implements GroupReduceFunction<Edge, EdgeWithDegrees> { final ArrayList<Integer> otherVertices = new ArrayList<Integer>(); final EdgeWithDegrees outputEdge = new EdgeWithDegrees(); @@ -208,7 +208,7 @@ public class EnumTrianglesOpt { * Builds an edge with degree annotation from two edges that have the same vertices and only one * degree annotation. */ - private static class DegreeJoiner extends ReduceFunction<EdgeWithDegrees> { + private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> { private final EdgeWithDegrees outEdge = new EdgeWithDegrees(); @Override @@ -228,7 +228,7 @@ public class EnumTrianglesOpt { } /** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */ - private static class EdgeByDegreeProjector extends MapFunction<EdgeWithDegrees, Edge> { + private static class EdgeByDegreeProjector implements MapFunction<EdgeWithDegrees, Edge> { private final Edge outEdge = new Edge(); @@ -249,7 +249,7 @@ public class EnumTrianglesOpt { } /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */ - private static class EdgeByIdProjector extends MapFunction<Edge, Edge> { + private static class EdgeByIdProjector implements MapFunction<Edge, Edge> { @Override public Edge map(Edge inEdge) throws Exception { @@ -268,7 +268,7 @@ public class EnumTrianglesOpt { * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. * Assumes that input edges share the first vertex and are in ascending order of the second vertex. */ - private static class TriadBuilder extends GroupReduceFunction<Edge, Triad> { + private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { private final List<Integer> vertices = new ArrayList<Integer>(); private final Triad outTriad = new Triad(); @@ -300,7 +300,7 @@ public class EnumTrianglesOpt { } /** Filters triads (three vertices connected by two edges) without a closing third edge. */ - private static class TriadFilter extends JoinFunction<Triad, Edge, Triad> { + private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> { @Override public Triad join(Triad triad, Edge edge) throws Exception { @@ -332,7 +332,7 @@ public class EnumTrianglesOpt { System.out.println("Executing Enum Triangles Opt example with built-in default data."); System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>"); + System.out.println(" Usage: EnumTriangleOpt <edge path> <result path>"); } return true; }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java index 18eba5d..ba9754f 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java @@ -23,10 +23,10 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM; import java.util.ArrayList; import java.util.Iterator; -import org.apache.flink.api.java.functions.FilterFunction; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -139,7 +139,7 @@ public class PageRankBasic { /** * A map function that assigns an initial rank to all pages. */ - public static final class RankAssigner extends MapFunction<Tuple1<Long>, Tuple2<Long, Double>> { + public static final class RankAssigner implements MapFunction<Tuple1<Long>, Tuple2<Long, Double>> { Tuple2<Long, Double> outPageWithRank; public RankAssigner(double rank) { @@ -158,7 +158,7 @@ public class PageRankBasic { * originate. Run as a pre-processing step. */ @ConstantFields("0") - public static final class BuildOutgoingEdgeList extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { + public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { private final ArrayList<Long> neighbors = new ArrayList<Long>(); @@ -179,7 +179,7 @@ public class PageRankBasic { /** * Join function that distributes a fraction of a vertex's rank to all neighbors. */ - public static final class JoinVertexWithEdgesMatch extends FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { + public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { @Override public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){ @@ -197,7 +197,7 @@ public class PageRankBasic { * The function that applies the page rank dampening formula */ @ConstantFields("0") - public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { + public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { private final double dampening; private final double randomJump; @@ -217,7 +217,7 @@ public class PageRankBasic { /** * Filter that filters vertices where the rank difference is below a threshold. */ - public static final class EpsilonFilter extends FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { + public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { @Override public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java index d8d8b62..22054da 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java @@ -19,6 +19,8 @@ package org.apache.flink.example.java.graph; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; @@ -26,8 +28,6 @@ import org.apache.flink.api.java.IterativeDataSet; import java.util.Iterator; import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.example.java.graph.util.ConnectedComponentsData; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java index 1d687f3..0868732 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java @@ -22,8 +22,9 @@ package org.apache.flink.example.java.ml; import java.io.Serializable; import java.util.Collection; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.example.java.ml.util.LinearRegressionData; @@ -183,7 +184,7 @@ public class LinearRegression { // ************************************************************************* /** Converts a Tuple2<Double,Double> into a Data. */ - public static final class TupleDataConverter extends MapFunction<Tuple2<Double, Double>, Data> { + public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> { @Override public Data map(Tuple2<Double, Double> t) throws Exception { @@ -192,7 +193,7 @@ public class LinearRegression { } /** Converts a Tuple2<Double,Double> into a Params. */ - public static final class TupleParamsConverter extends MapFunction<Tuple2<Double, Double>,Params> { + public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> { @Override public Params map(Tuple2<Double, Double> t)throws Exception { @@ -203,7 +204,7 @@ public class LinearRegression { /** * Compute a single BGD type update for every parameters. */ - public static class SubUpdate extends MapFunction<Data,Tuple2<Params,Integer>>{ + public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> { private Collection<Params> parameters; @@ -234,7 +235,7 @@ public class LinearRegression { /** * Accumulator all the update. * */ - public static class UpdateAccumulator extends ReduceFunction<Tuple2<Params, Integer>> { + public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> { @Override public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) { @@ -250,7 +251,7 @@ public class LinearRegression { /** * Compute the final update by average them. */ - public static class Update extends MapFunction<Tuple2<Params, Integer>,Params>{ + public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> { @Override public Params map(Tuple2<Params, Integer> arg0) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java index 61b90dd..4bced17 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FilterFunction; +import org.apache.flink.api.java.functions.RichFilterFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -149,7 +149,7 @@ public class EmptyFieldsCountAccumulator { * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}). */ - public static final class EmptyFieldFilter extends FilterFunction<Tuple> { + public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> { // create a new accumulator in each filter function instance // accumulators can be merged later on http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java index 48cdedc..08a261c 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java @@ -18,8 +18,8 @@ package org.apache.flink.example.java.relational; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java index ef03e6f..1ff6583 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java @@ -18,9 +18,9 @@ package org.apache.flink.example.java.relational; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FilterFunction; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java index 52109ea..4544fd4 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java @@ -25,9 +25,9 @@ import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FilterFunction; -import org.apache.flink.api.java.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java index 2649d24..3033c0d 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java @@ -21,8 +21,8 @@ package org.apache.flink.example.java.relational; import java.util.Iterator; -import org.apache.flink.api.java.functions.CoGroupFunction; -import org.apache.flink.api.java.functions.FilterFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -159,7 +159,7 @@ public class WebLogAnalysis { * MapFunction that filters for documents that contain a certain set of * keywords. */ - public static class FilterDocByKeyWords extends FilterFunction<Tuple2<String, String>> { + public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> { private static final String[] KEYWORDS = { " editors ", " oscillations " }; @@ -187,7 +187,7 @@ public class WebLogAnalysis { /** * MapFunction that filters for records where the rank exceeds a certain threshold. */ - public static class FilterByRank extends FilterFunction<Tuple3<Integer, String, Integer>> { + public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> { private static final int RANKFILTER = 40; @@ -210,7 +210,7 @@ public class WebLogAnalysis { * MapFunction that filters for records of the visits relation where the year * (from the date string) is equal to a certain value. */ - public static class FilterVisitsByDate extends FilterFunction<Tuple2<String, String>> { + public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> { private static final int YEARFILTER = 2007; @@ -237,7 +237,7 @@ public class WebLogAnalysis { * If the first input does not provide any pairs, all pairs of the second input are emitted. * Otherwise, no pair is emitted. */ - public static class AntiJoinVisits extends CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> { + public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> { /** * If the visit iterator is empty, all pairs of the rank iterator are emitted. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java index a18abcb..3e95ccd 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java @@ -18,7 +18,7 @@ package org.apache.flink.example.java.wordcount; -import org.apache.flink.api.java.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; @@ -91,7 +91,7 @@ public class WordCount { * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). */ - public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> { + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java index 32a8997..04810a1 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java @@ -18,8 +18,8 @@ package org.apache.flink.example.java.wordcount; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -111,7 +111,7 @@ public class WordCountPOJO { * FlatMapFunction. The function takes a line (String) and splits it into * multiple WC POJOs as "(word, 1)". */ - public static final class Tokenizer extends FlatMapFunction<String, WC> { + public static final class Tokenizer implements FlatMapFunction<String, WC> { @Override public void flatMap(String value, Collector<WC> out) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml index 917461a..ae46681 100644 --- a/flink-examples/pom.xml +++ b/flink-examples/pom.xml @@ -50,5 +50,4 @@ under the License. <module>flink-java-examples</module> <module>flink-scala-examples</module> </modules> - </project> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 894880e..e7199f9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -19,42 +19,39 @@ package org.apache.flink.api.java; import org.apache.commons.lang3.Validate; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.CoGroupFunction; -import org.apache.flink.api.java.functions.FilterFunction; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.CoGroupOperator; +import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets; import org.apache.flink.api.java.operators.CrossOperator; +import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.DistinctOperator; import org.apache.flink.api.java.operators.FilterOperator; import org.apache.flink.api.java.operators.FlatMapOperator; -import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.JoinOperator; +import org.apache.flink.api.java.operators.JoinOperator.JoinHint; +import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.operators.MapOperator; -import org.apache.flink.api.java.operators.ProjectOperator; -import org.apache.flink.api.java.operators.ReduceGroupOperator; +import org.apache.flink.api.java.operators.ProjectOperator.Projection; +import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.ReduceOperator; -import org.apache.flink.api.java.operators.SortedGrouping; import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets; -import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; -import org.apache.flink.api.java.operators.JoinOperator.JoinHint; -import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; -import org.apache.flink.api.java.operators.ProjectOperator.Projection; import org.apache.flink.api.java.record.functions.CrossFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; @@ -68,8 +65,8 @@ import org.apache.flink.types.TypeInformation; * A DataSet represents a collection of elements of the same type.<br/> * A DataSet can be transformed into another DataSet by applying a transformation as for example * <ul> - * <li>{@link DataSet#map(MapFunction)},</li> - * <li>{@link DataSet#reduce(ReduceFunction)},</li> + * <li>{@link DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li> + * <li>{@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li> * <li>{@link DataSet#join(DataSet)}, or</li> * <li>{@link DataSet#coGroup(DataSet)}.</li> * </ul> @@ -124,13 +121,13 @@ public abstract class DataSet<T> { /** * Applies a Map transformation on a {@link DataSet}.<br/> - * The transformation calls a {@link MapFunction} for each element of the DataSet. + * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet. * Each MapFunction call returns exactly one element. * * @param mapper The MapFunction that is called for each element of the DataSet. * @return A MapOperator that represents the transformed DataSet. * - * @see MapFunction + * @see org.apache.flink.api.java.functions.RichMapFunction * @see MapOperator * @see DataSet */ @@ -138,18 +135,21 @@ public abstract class DataSet<T> { if (mapper == null) { throw new NullPointerException("Map function must not be null."); } + if (FunctionUtils.isSerializedLambdaFunction(mapper)) { + throw new UnsupportedLambdaExpressionException(); + } return new MapOperator<T, R>(this, mapper); } /** * Applies a FlatMap transformation on a {@link DataSet}.<br/> - * The transformation calls a {@link FlatMapFunction} for each element of the DataSet. + * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet. * Each FlatMapFunction call can return any number of elements including none. * * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. * @return A FlatMapOperator that represents the transformed DataSet. * - * @see FlatMapFunction + * @see org.apache.flink.api.java.functions.RichFlatMapFunction * @see FlatMapOperator * @see DataSet */ @@ -157,19 +157,22 @@ public abstract class DataSet<T> { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null."); } + if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) { + throw new UnsupportedLambdaExpressionException(); + } return new FlatMapOperator<T, R>(this, flatMapper); } /** * Applies a Filter transformation on a {@link DataSet}.<br/> - * The transformation calls a {@link FilterFunction} for each element of the DataSet + * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet * and retains only those element for which the function returns true. Elements for * which the function returns false are filtered. * * @param filter The FilterFunction that is called for each element of the DataSet. * @return A FilterOperator that represents the filtered DataSet. * - * @see FilterFunction + * @see org.apache.flink.api.java.functions.RichFilterFunction * @see FilterOperator * @see DataSet */ @@ -179,6 +182,7 @@ public abstract class DataSet<T> { } return new FilterOperator<T>(this, filter); } + // -------------------------------------------------------------------------------------------- // Projections @@ -264,14 +268,14 @@ public abstract class DataSet<T> { /** * Applies a Reduce transformation on a non-grouped {@link DataSet}.<br/> - * The transformation consecutively calls a {@link ReduceFunction} + * The transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction} * until only a single element remains which is the result of the transformation. * A ReduceFunction combines two elements into one new element of the same type. * * @param reducer The ReduceFunction that is applied on the DataSet. * @return A ReduceOperator that represents the reduced DataSet. * - * @see ReduceFunction + * @see org.apache.flink.api.java.functions.RichReduceFunction * @see ReduceOperator * @see DataSet */ @@ -284,24 +288,27 @@ public abstract class DataSet<T> { /** * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.<br/> - * The transformation calls a {@link GroupReduceFunction} once with the full DataSet. + * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} once with the full DataSet. * The GroupReduceFunction can iterate over all elements of the DataSet and emit any * number of output elements including none. * * @param reducer The GroupReduceFunction that is applied on the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. * - * @see GroupReduceFunction - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.functions.RichGroupReduceFunction + * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ - public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { + public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - return new ReduceGroupOperator<T, R>(this, reducer); + if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + throw new UnsupportedLambdaExpressionException(); + } + return new GroupReduceOperator<T, R>(this, reducer); } - + // -------------------------------------------------------------------------------------------- // distinct // -------------------------------------------------------------------------------------------- @@ -359,8 +366,8 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. @@ -372,7 +379,7 @@ public abstract class DataSet<T> { * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ public <K extends Comparable<K>> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) { @@ -388,8 +395,8 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param fields One or more field positions on which the DataSet will be grouped. @@ -401,7 +408,7 @@ public abstract class DataSet<T> { * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ public UnsortedGrouping<T> groupBy(int... fields) { @@ -417,8 +424,8 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param fields One or more field expressions on which the DataSet will be grouped. @@ -430,7 +437,7 @@ public abstract class DataSet<T> { * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet */ public UnsortedGrouping<T> groupBy(String... fields) { @@ -461,7 +468,7 @@ public abstract class DataSet<T> { public <R> JoinOperatorSets<T, R> join(DataSet<R> other) { return new JoinOperatorSets<T, R>(this, other); } - + /** * Initiates a Join transformation. <br/> * A Join transformation joins the elements of two @@ -514,7 +521,7 @@ public abstract class DataSet<T> { * Initiates a CoGroup transformation.<br/> * A CoGroup transformation combines the elements of * two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and - * gives groups of both DataSets with equal keys together into a {@link CoGroupFunction}. + * gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.java.functions.RichCoGroupFunction}. * If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction * is called with an empty group for the non-existing group.</br> * The CoGroupFunction can iterate over the elements of both groups and return any number http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java index bb53a89..2f2eae0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java @@ -189,7 +189,7 @@ public class DeltaIteration<ST, WT> { * The value of an aggregator can be accessed in the next iteration. * <p> * Aggregators can be accessed inside a function via the - * {@link org.apache.flink.api.common.functions.AbstractFunction#getIterationRuntimeContext()} method. + * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method. * * @param name The name under which the aggregator is registered. * @param aggregator The aggregator class. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index d00fb47..ebd1422 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment { * individually override this value to use a specific degree of parallelism via * {@link Operator#setParallelism(int)}. Other operations may need to run with a different * degree of parallelism - for example calling - * {@link DataSet#reduce(org.apache.flink.api.java.functions.ReduceFunction)} over the entire + * {@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over the entire * set will insert eventually an operation that runs non-parallel (degree of parallelism of one). * * @return The degree of parallelism used by operations, unless they override that value. This method @@ -550,7 +550,7 @@ public abstract class ExecutionEnvironment { * The runtime will copy the files temporarily to a local cache, if needed. * <p> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via - * {@link org.apache.flink.api.common.functions.Function#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access * {@link org.apache.flink.api.common.cache.DistributedCache} via * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. * @@ -568,7 +568,7 @@ public abstract class ExecutionEnvironment { * The runtime will copy the files temporarily to a local cache, if needed. * <p> * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via - * {@link org.apache.flink.api.common.functions.Function#getRuntimeContext()} and provides access + * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access * {@link org.apache.flink.api.common.cache.DistributedCache} via * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. * http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java index 6443dbb..5dc0d2e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java @@ -93,7 +93,7 @@ public class IterativeDataSet<T> extends SingleInputOperator<T, T, IterativeData * The value of an aggregator can be accessed in the next iteration. * <p> * Aggregators can be accessed inside a function via the - * {@link org.apache.flink.api.common.functions.AbstractFunction#getIterationRuntimeContext()} method. + * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()} method. * * @param name The name under which the aggregator is registered. * @param aggregator The aggregator class. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java deleted file mode 100644 index 201794a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import java.util.Iterator; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCoGrouper; -import org.apache.flink.util.Collector; - -/** - * The abstract base class for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set - * after a key and then "joining" the groups by calling this function with the two sets for each key. - * If a key is present in only one of the two inputs, it may be that one of the groups is empty. - * <p> - * The basic syntax for using CoGoup on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * The keys can be defined through tuple field positions or key extractors. - * See {@link org.apache.flink.api.java.operators.Keys} for details. - * <p> - * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked - * with in empty input for the side of the data set that did not contain elements with that specific key. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN1> The type of the elements in the first input. - * @param <IN2> The type of the elements in the second input. - * @param <OUT> The type of the result elements. - */ -public abstract class CoGroupFunction<IN1, IN2, OUT> extends AbstractFunction implements GenericCoGrouper<IN1, IN2, OUT> { - - private static final long serialVersionUID = 1L; - - - /** - * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same - * key. The elements of the groups are returned by the respective iterators. - * - * It is possible that one of the two groups is empty, in which case the respective iterator has no elements. - * - * @param first The group from the first input. - * @param second The group from the second input. - * @param out The collector through which to return the result elements. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java deleted file mode 100644 index 27907ec..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCrosser; -import org.apache.flink.util.Collector; - - -/** - * The abstract base class for Cross functions. Cross functions build a Cartesian produce of their inputs - * and call the function or each pair of elements. - * They are a means of convenience and can be used to directly produce manipulate the - * pair of elements, instead of having the operator build 2-tuples, and then using a - * MapFunction over those 2-tuples. - * <p> - * The basic syntax for using Cross on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.cross(set2).with(new MyCrossFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN1> The type of the elements in the first input. - * @param <IN2> The type of the elements in the second input. - * @param <OUT> The type of the result elements. - */ -public abstract class CrossFunction<IN1, IN2, OUT> extends AbstractFunction implements GenericCrosser<IN1, IN2, OUT>{ - - private static final long serialVersionUID = 1L; - - - /** - * The core method of the cross operation. The method will be invoked for each pair of elements - * in the Cartesian product. - * - * @param first The element from the first input. - * @param second The element from the second input. - * @return The result element. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - public abstract OUT cross(IN1 first, IN2 second) throws Exception; - - - - /** - * This method only delegates calls to the {@link #cross(Object, Object)} method. - */ - @Override - public final void cross(IN1 record1, IN2 record2, Collector<OUT> out) throws Exception { - out.collect(cross(record1, record2)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java deleted file mode 100644 index aac2086..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericFilter; - -/** - * The abstract base class for Filter functions. A filter function take elements and evaluates a - * predicate on them to decide whether to keep the element, or to discard it. - * <p> - * The basic syntax for using a FilterFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.filter(new MyFilterFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the FilterFunction needs to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <T> The type of the filtered elements. - */ -public abstract class FilterFunction<T> extends AbstractFunction implements GenericFilter<T> { - - private static final long serialVersionUID = 1L; - - /** - * The core method of the FilterFunction. The method is called for each element in the input, - * and determines whether the element should be kept or filtered out. If the method returns true, - * the element passes the filter and is kept, if the method returns false, the element is - * filtered out. - * - * @param value The input value to be filtered. - * @return Flag to indicate whether to keep the value (true) or to discard it (false). - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract boolean filter(T value) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java deleted file mode 100644 index f9c22cc..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericFlatMap; -import org.apache.flink.util.Collector; - -/** - * The abstract base class for flatMap functions. FlatMap functions take elements and transform them, - * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists - * and arrays. Operations that produce multiple strictly one result element per input element can also - * use the {@link MapFunction}. - * <p> - * The basic syntax for using a FlatMapFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<Y> result = input.flatMap(new MyFlatMapFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the FlatMapFunction needs to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN> Type of the input elements. - * @param <OUT> Type of the returned elements. - */ -public abstract class FlatMapFunction<IN, OUT> extends AbstractFunction implements GenericFlatMap<IN, OUT> { - - private static final long serialVersionUID = 1L; - - /** - * The core method of the FlatMapFunction. Takes an element from the input data set and transforms - * it into zero, one, or more elements. - * - * @param value The input value. - * @param out The collector for for emitting result values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java index 5cc8c12..012ab57 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java @@ -23,7 +23,7 @@ import java.util.Iterator; import org.apache.flink.util.Collector; /** - * A variant of the {@link FlatMapFunction} that returns elements through an iterator, rather then + * A variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then * through a collector. In all other respects, it behaves exactly like the FlatMapFunction. * <p> * The function needs to be serializable, as defined in {@link java.io.Serializable}. @@ -31,7 +31,7 @@ import org.apache.flink.util.Collector; * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ -public abstract class FlatMapIterator<IN, OUT> extends FlatMapFunction<IN, OUT> { +public abstract class FlatMapIterator<IN, OUT> extends RichFlatMapFunction<IN, OUT> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java index 1015971..b94840f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java @@ -64,15 +64,15 @@ import org.apache.flink.api.common.InvalidProgramException; * </b> * <p> * Be aware that some annotations should only be used for functions with as single input - * ({@link MapFunction}, {@link ReduceFunction}) and some only for stubs with two inputs - * ({@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}). + * ({@link RichMapFunction}, {@link RichReduceFunction}) and some only for stubs with two inputs + * ({@link RichCrossFunction}, {@link RichFlatJoinFunction}, {@link RichCoGroupFunction}). */ public class FunctionAnnotation { /** * This annotation declares that a function leaves certain fields of its input values unmodified and * only "forwards" or "copies" them to the return value. The annotation is applicable to unary - * functions, like for example {@link MapFunction}, {@link ReduceFunction}, or {@link FlatMapFunction}. + * functions, like for example {@link RichMapFunction}, {@link RichReduceFunction}, or {@link RichFlatMapFunction}. * <p> * The following example illustrates a function that keeps the tuple's field zero constant: * <pre><blockquote> @@ -103,7 +103,7 @@ public class FunctionAnnotation { /** * This annotation declares that a function leaves certain fields of its first input values unmodified and * only "forwards" or "copies" them to the return value. The annotation is applicable to binary - * functions, like for example {@link JoinFunction}, {@link CoGroupFunction}, or {@link CrossFunction}. + * functions, like for example {@link RichFlatJoinFunction}, {@link RichCoGroupFunction}, or {@link RichCrossFunction}. * <p> * The following example illustrates a join function that copies fields from the first and second input to the * return value: @@ -135,7 +135,7 @@ public class FunctionAnnotation { /** * This annotation declares that a function leaves certain fields of its second input values unmodified and * only "forwards" or "copies" them to the return value. The annotation is applicable to binary - * functions, like for example {@link JoinFunction}, {@link CoGroupFunction}, or {@link CrossFunction}. + * functions, like for example {@link RichFlatJoinFunction}, {@link RichCoGroupFunction}, or {@link RichCrossFunction}. * <p> * The following example illustrates a join function that copies fields from the first and second input to the * return value: @@ -167,7 +167,7 @@ public class FunctionAnnotation { /** * This annotation declares that a function changes certain fields of its input values, while leaving all * others unmodified and in place in the return value. The annotation is applicable to unary - * functions, like for example {@link MapFunction}, {@link ReduceFunction}, or {@link FlatMapFunction}. + * functions, like for example {@link RichMapFunction}, {@link RichReduceFunction}, or {@link RichFlatMapFunction}. * <p> * The following example illustrates that at the example of a Map function: * @@ -201,7 +201,7 @@ public class FunctionAnnotation { /** * This annotation declares that a function changes certain fields of its first input value, while leaving all * others unmodified and in place in the return value. The annotation is applicable to binary - * functions, like for example {@link JoinFunction}, {@link CoGroupFunction}, or {@link CrossFunction}. + * functions, like for example {@link RichFlatJoinFunction}, {@link RichCoGroupFunction}, or {@link RichCrossFunction}. * <p> * The following example illustrates a join function that copies fields from the first and second input to the * return value: @@ -238,7 +238,7 @@ public class FunctionAnnotation { /** * This annotation declares that a function changes certain fields of its second input value, while leaving all * others unmodified and in place in the return value. The annotation is applicable to binary - * functions, like for example {@link JoinFunction}, {@link CoGroupFunction}, or {@link CrossFunction}. + * functions, like for example {@link RichFlatJoinFunction}, {@link RichCoGroupFunction}, or {@link RichCrossFunction}. * <p> * The following example illustrates a join function that copies fields from the first and second input to the * return value: http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java deleted file mode 100644 index 01ae9c1..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.util.Iterator; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.util.Collector; - -/** - * The abstract base class for group reduce functions. Group reduce functions process groups of elements. - * They may aggregate them to a single value, or produce multiple result values for each group. - * <p> - * For a reduce functions that works incrementally by combining always two elements, see - * {@link ReduceFunction}, called via {@link org.apache.flink.api.java.DataSet#reduce(ReduceFunction)}. - * <p> - * The basic syntax for using a grouped GroupReduceFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction()); - * </blockquote></pre> - * <p> - * GroupReduceFunctions may be "combinable", in which case they can pre-reduce partial groups in order to - * reduce the data volume early. See the {@link #combine(Iterator, Collector)} function for details. - * <p> - * Like all functions, the GroupReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN> Type of the elements that this function processes. - * @param <OUT> The type of the elements returned by the user-defined function. - */ -public abstract class GroupReduceFunction<IN, OUT> extends AbstractFunction implements GenericGroupReduce<IN, OUT>, GenericCombine<IN> { - - private static final long serialVersionUID = 1L; - - /** - * Core method of the reduce function. It is called one per group of elements. If the reducer - * is not grouped, than the entire data set is considered one group. - * - * @param values The iterator returning the group of values to be reduced. - * @param out The collector to emit the returned values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception; - - /** - * The combine methods pre-reduces elements. It may be called on subsets of the data - * before the actual reduce function. This is often helpful to lower data volume prior - * to reorganizing the data in an expensive way, as might be required for the final - * reduce function. - * <p> - * This method is only ever invoked when the subclass of {@link GroupReduceFunction} - * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining - * the <i>reduceGroup<i> operation via - * {@link org.apache.flink.api.java.operators.ReduceGroupOperator#setCombinable(boolean)}. - * <p> - * Since the reduce function will be called on the result of this method, it is important that this - * method returns the same data type as it consumes. By default, this method only calls the - * {@link #reduce(Iterator, Collector)} method. If the behavior in the pre-reducing is different - * from the final reduce function (for example because the reduce function changes the data type), - * this method must be overwritten, or the execution will fail. - * - * @param values The iterator returning the group of values to be reduced. - * @param out The collector to emit the returned values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public void combine(Iterator<IN> values, Collector<IN> out) throws Exception { - @SuppressWarnings("unchecked") - Collector<OUT> c = (Collector<OUT>) out; - reduce(values, c); - } - - // -------------------------------------------------------------------------------------------- - - /** - * This annotation can be added to classes that extend {@link GroupReduceFunction}, in oder to mark - * them as "combinable". The system may call the {@link GroupReduceFunction#combine(Iterator, Collector)} - * method on such functions, to pre-reduce the data before transferring it over the network to - * the actual group reduce operation. - * <p> - * Marking combinable functions as such is in general beneficial for performance. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface Combinable {}; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java index 6cb397b..b363606 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java @@ -23,7 +23,7 @@ import java.util.Iterator; import org.apache.flink.util.Collector; -public abstract class GroupReduceIterator<IN, OUT> extends GroupReduceFunction<IN, OUT> { +public abstract class GroupReduceIterator<IN, OUT> extends RichGroupReduceFunction<IN, OUT> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java deleted file mode 100644 index c78e6f3..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericJoiner; -import org.apache.flink.util.Collector; - -/** - * The abstract base class for Join functions. Join functions combine two data sets by joining their - * elements on specified keys and calling this function with each pair of joining elements. - * By default, this follows strictly the semantics of an "inner join" in SQL. - * the semantics are those of an "inner join", meaning that elements are filtered out - * if their key is not contained in the other data set. - * <p> - * Per the semantics of an inner join, the function is - * <p> - * The basic syntax for using Join on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * The keys can be defined through tuple field positions or key extractors. - * See {@link org.apache.flink.api.java.operators.Keys} for details. - * <p> - * The Join function is actually not a necessary part of a join operation. If no JoinFunction is provided, - * the result of the operation is a sequence of Tuple2, where the elements in the tuple are those that - * the JoinFunction would have been invoked with. - * <P> - * Note: You can use a {@link CoGroupFunction} to perform an outer join. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN1> The type of the elements in the first input. - * @param <IN2> The type of the elements in the second input. - * @param <OUT> The type of the result elements. - */ -public abstract class JoinFunction<IN1, IN2, OUT> extends AbstractFunction implements GenericJoiner<IN1, IN2, OUT> { - - private static final long serialVersionUID = 1L; - - /** - * The user-defined method for performing transformations after a join. - * The method is called with matching pairs of elements from the inputs. - * - * @param first The element from first input. - * @param second The element from second input. - * @return The resulting element. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - public abstract OUT join(IN1 first, IN2 second) throws Exception; - - - /** - * The user-defined method for performing transformations after a join, for operations that - * produce zero elements, or more than one element. - * By default, this method delegates to the method {@link #join(Object, Object)}. If this method - * is overridden, that method will no longer be called. - * - * @param value1 The element from first input. - * @param value2 The element from second input. - * @param out A collector to emit resulting element (zero, one, or many). - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public void join(IN1 value1, IN2 value2, Collector<OUT> out) throws Exception { - out.collect(join(value1, value2)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java deleted file mode 100644 index 64aec2a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericMap; - -/** - * The abstract base class for Map functions. Map functions take elements and transform them, - * element wise. A Map function always produces a single result element for each input element. - * Typical applications are parsing elements, converting data types, or projecting out fields. - * Operations that produce multiple result elements from a single input element can be implemented - * using the {@link FlatMapFunction}. - * <p> - * The basic syntax for using a MapFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<Y> result = input.map(new MyMapFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the MapFunction needs to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <IN> Type of the input elements. - * @param <OUT> Type of the returned elements. - */ -public abstract class MapFunction<IN, OUT> extends AbstractFunction implements GenericMap<IN, OUT> { - - private static final long serialVersionUID = 1L; - - /** - * The core method of the MapFunction. Takes an element from the input data set and transforms - * it into another element. - * - * @param value The input value. - * @return The value produced by the map function from the input value. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract OUT map(IN value) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java deleted file mode 100644 index aea6bf8..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.functions; - -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericReduce; - -/** - * The abstract base class for Reduce functions. Reduce functions combine groups of elements to - * a single value, by taking always two elements and combining them into one. Reduce functions - * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced - * individually. - * <p> - * For a reduce functions that work on an entire group at the same time (such as the - * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}, called via - * {@link org.apache.flink.api.java.DataSet#reduceGroup(GroupReduceFunction)}. In the general case, - * ReduceFunctions are considered faster, because they allow the system to use hash-based - * execution strategies. - * <p> - * The basic syntax for using a grouped ReduceFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. - * - * @param <T> Type of the elements that this function processes. - */ -public abstract class ReduceFunction<T> extends AbstractFunction implements GenericReduce<T> { - - private static final long serialVersionUID = 1L; - - /** - * The core method of the ReduceFunction, combining two values into one value of the same type. - * The reduce function is consecutively applied to all values of a group until only a single value remains. - * - * @param value1 The first value to combine. - * @param value2 The second value to combine. - * @return The combined value of both input values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - public abstract T reduce(T value1, T value2) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java new file mode 100644 index 0000000..8aaaf86 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.functions; + +import java.util.Iterator; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.util.Collector; + +/** + * The abstract base class for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set + * after a key and then "joining" the groups by calling this function with the two sets for each key. + * If a key is present in only one of the two inputs, it may be that one of the groups is empty. + * <p> + * The basic syntax for using CoGoup on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * The keys can be defined through tuple field positions or key extractors. + * See {@link org.apache.flink.api.java.operators.Keys} for details. + * <p> + * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked + * with in empty input for the side of the data set that did not contain elements with that specific key. + * <p> + * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ +public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + + /** + * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same + * key. The elements of the groups are returned by the respective iterators. + * + * It is possible that one of the two groups is empty, in which case the respective iterator has no elements. + * + * @param first The group from the first input. + * @param second The group from the second input. + * @param out The collector through which to return the result elements. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception; + +}
