[FLINK-701] Several cleanups after SAM refactoring. - Lambda detection compiles on earlier java versions - Add lambda detection test. - Fix JavaDocs
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc89e911 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc89e911 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc89e911 Branch: refs/heads/release-0.6 Commit: bc89e911b2cd433bb841beab9e8e513ca0c3525d Parents: 22b24f2 Author: Stephan Ewen <[email protected]> Authored: Thu Jul 31 19:46:14 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 31 20:18:00 2014 +0200 ---------------------------------------------------------------------- .../examples/SpargelConnectedComponents.java | 4 +- .../spargel/java/examples/SpargelPageRank.java | 8 +- .../SpargelPageRankCountingVertices.java | 11 +- .../apache/flink/client/web/JobsServlet.java | 1 - .../org/apache/flink/compiler/PactCompiler.java | 6 +- .../apache/flink/compiler/CompilerTestBase.java | 6 +- .../flink/compiler/util/DummyCrossStub.java | 4 - .../common/functions/AbstractRichFunction.java | 8 +- .../api/common/functions/CoGroupFunction.java | 38 +++++-- .../api/common/functions/CombineFunction.java | 24 ++++- .../api/common/functions/CrossFunction.java | 38 +++++-- .../api/common/functions/ExecutionContext.java | 51 ---------- .../api/common/functions/FilterFunction.java | 25 ++++- .../common/functions/FlatCombineFunction.java | 22 +++- .../api/common/functions/FlatJoinFunction.java | 45 +++++++- .../api/common/functions/FlatMapFunction.java | 23 +++-- .../flink/api/common/functions/Function.java | 1 - .../common/functions/GenericCollectorMap.java | 10 +- .../common/functions/GroupReduceFunction.java | 34 +++++-- .../api/common/functions/JoinFunction.java | 40 +++++++- .../flink/api/common/functions/MapFunction.java | 21 +++- .../api/common/functions/ReduceFunction.java | 26 ++++- .../api/common/functions/RichFunction.java | 1 - .../api/common/functions/RuntimeContext.java | 28 ----- .../common/functions/util/FunctionUtils.java | 38 ++++--- .../flink/api/common/operators/Union.java | 1 - .../operators/base/BulkIterationBase.java | 1 + .../base/CollectorMapOperatorBase.java | 2 +- .../api/common/operators/util/OperatorUtil.java | 3 +- .../common/operators/util/OperatorUtilTest.java | 5 + .../java/org/apache/flink/api/java/DataSet.java | 49 ++++----- .../flink/api/java/ExecutionEnvironment.java | 2 +- .../api/java/functions/FlatMapIterator.java | 2 +- .../api/java/functions/RichCoGroupFunction.java | 39 +------ .../api/java/functions/RichCrossFunction.java | 35 +------ .../api/java/functions/RichFilterFunction.java | 28 +---- .../java/functions/RichFlatCombineFunction.java | 9 ++ .../java/functions/RichFlatJoinFunction.java | 43 +------- .../api/java/functions/RichFlatMapFunction.java | 28 +---- .../java/functions/RichGroupReduceFunction.java | 32 +----- .../api/java/functions/RichJoinFunction.java | 12 ++- .../api/java/functions/RichMapFunction.java | 29 +----- .../api/java/functions/RichReduceFunction.java | 35 +------ .../api/java/operators/CoGroupOperator.java | 2 +- .../flink/api/java/operators/CrossOperator.java | 2 +- .../api/java/operators/DistinctOperator.java | 28 ++--- .../api/java/operators/GroupReduceOperator.java | 7 +- .../flink/api/java/operators/Grouping.java | 4 +- .../flink/api/java/operators/JoinOperator.java | 6 +- .../api/java/operators/SortedGrouping.java | 4 +- .../api/java/operators/UnsortedGrouping.java | 2 +- .../PlanUnwrappingReduceGroupOperator.java | 2 +- .../java/record/functions/CoGroupFunction.java | 2 +- .../java/record/functions/CrossFunction.java | 6 +- .../api/java/record/functions/JoinFunction.java | 15 +-- .../api/java/record/functions/MapFunction.java | 2 +- .../java/record/functions/ReduceFunction.java | 2 +- .../api/java/record/operators/MapOperator.java | 1 + .../DeltaIterationTranslationTest.java | 2 +- .../translation/DistrinctTranslationTest.java | 57 +++++++++++ .../java/type/extractor/TypeExtractorTest.java | 4 + .../javaApiOperators/lambdas/CoGroupITCase.java | 1 + .../javaApiOperators/lambdas/CrossITCase.java | 1 + .../javaApiOperators/lambdas/FilterITCase.java | 70 ++----------- .../lambdas/FlatJoinITCase.java | 1 + .../javaApiOperators/lambdas/FlatMapITCase.java | 1 + .../lambdas/GroupReduceITCase.java | 1 + .../javaApiOperators/lambdas/JoinITCase.java | 1 + .../lambdas/LambdaExtractionTest.java | 82 +++++++++++++++ .../javaApiOperators/lambdas/MapITCase.java | 1 + .../javaApiOperators/lambdas/ReduceITCase.java | 102 +++++-------------- .../SolutionSetFastUpdateOutputCollector.java | 6 +- .../io/SolutionSetUpdateOutputCollector.java | 2 +- .../task/AbstractIterativePactTask.java | 4 +- .../iterative/task/IterationHeadPactTask.java | 7 +- .../task/IterationIntermediatePactTask.java | 9 +- .../task/IterationSynchronizationSinkTask.java | 4 +- .../iterative/task/IterationTailPactTask.java | 9 +- .../operators/RuntimeExecutionContext.java | 56 ---------- .../flink/test/operators/CrossITCase.java | 18 +--- 80 files changed, 658 insertions(+), 734 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java index a4ba6fa..0f1f26d 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java @@ -18,7 +18,7 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.spargel.java.MessageIterator; import org.apache.flink.spargel.java.MessagingFunction; @@ -70,7 +70,7 @@ public class SpargelConnectedComponents { * A map function that takes a Long value and creates a 2-tuple out of it: * <pre>(Long value) -> (value, value)</pre> */ - public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> { + public static final class IdAssigner implements MapFunction<Long, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> map(Long value) { return new Tuple2<Long, Long>(value, value); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java index 9dfc327..6d3dc95 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java @@ -18,8 +18,8 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.RichFlatMapFunction; -import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.spargel.java.MessageIterator; @@ -48,7 +48,7 @@ public class SpargelPageRank { // enumerate some sample edges and assign an initial uniform probability (rank) DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices) - .map(new RichMapFunction<Long, Tuple2<Long, Double>>() { + .map(new MapFunction<Long, Tuple2<Long, Double>>() { public Tuple2<Long, Double> map(Long value) { return new Tuple2<Long, Double>(value, 1.0/numVertices); } @@ -56,7 +56,7 @@ public class SpargelPageRank { // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices) - .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() { + .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() { public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) { int numOutEdges = (int) (Math.random() * (numVertices / 2)); for (int i = 0; i < numOutEdges; i++) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java index 43c0b84..01d2cd7 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java @@ -18,9 +18,10 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.RichMapFunction; -import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -53,7 +54,7 @@ public class SpargelPageRankCountingVertices { // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES) - .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() { + .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() { public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) { int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2)); for (int i = 0; i < numOutEdges; i++) { @@ -67,12 +68,12 @@ public class SpargelPageRankCountingVertices { // count the number of vertices DataSet<Long> count = vertices - .map(new RichMapFunction<Long, Long>() { + .map(new MapFunction<Long, Long>() { public Long map(Long value) { return 1L; } }) - .reduce(new RichReduceFunction<Long>() { + .reduce(new ReduceFunction<Long>() { public Long reduce(Long value1, Long value2) { return value1 + value2; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java index c62f0d7..cd53115 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java @@ -180,7 +180,6 @@ public class JobsServlet extends HttpServlet { // parse the request ServletFileUpload uploadHandler = new ServletFileUpload(fileItemFactory); try { - @SuppressWarnings("unchecked") Iterator<FileItem> itr = ((List<FileItem>) uploadHandler.parseRequest(req)).iterator(); // go over the form fields and look for our file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index b55dea0..e22a365 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -37,7 +37,6 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Union; import org.apache.flink.api.common.operators.base.BulkIterationBase; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; -import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase; import org.apache.flink.api.common.operators.base.CrossOperatorBase; import org.apache.flink.api.common.operators.base.DeltaIterationBase; import org.apache.flink.api.common.operators.base.FilterOperatorBase; @@ -648,6 +647,7 @@ public class PactCompiler { this.forceDOP = forceDOP; } + @SuppressWarnings("deprecation") @Override public boolean preVisit(Operator<?> c) { // check if we have been here before @@ -671,8 +671,8 @@ public class PactCompiler { else if (c instanceof MapOperatorBase) { n = new MapNode((MapOperatorBase<?, ?, ?>) c); } - else if (c instanceof CollectorMapOperatorBase) { - n = new CollectorMapNode((CollectorMapOperatorBase<?, ?, ?>) c); + else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) { + n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c); } else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java index b2c163b..0115c31 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Set; import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.GenericDataSourceBase; @@ -179,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { } @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) { + public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) { List<PlanNode> nodes = this.map.get(name); if (nodes == null || nodes.isEmpty()) { throw new RuntimeException("No node found with the given name and stub class."); @@ -241,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { } @SuppressWarnings("unchecked") - public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) { + public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) { List<Operator<?>> nodes = this.map.get(name); if (nodes == null || nodes.isEmpty()) { throw new RuntimeException("No node found with the given name and stub class."); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java index 736ee14..d6ecc40 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java @@ -16,18 +16,14 @@ * limitations under the License. */ - package org.apache.flink.compiler.util; -import java.io.Serializable; - import org.apache.flink.api.java.record.functions.CrossFunction; import org.apache.flink.types.Record; public class DummyCrossStub extends CrossFunction { private static final long serialVersionUID = 1L; - @Override public Record cross(Record first, Record second) throws Exception { return first; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java index 07b957d..981de14 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; @@ -24,9 +23,10 @@ import java.io.Serializable; import org.apache.flink.configuration.Configuration; /** - * An abstract stub implementation for user-defined functions. It offers default implementations - * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the - * {@link RuntimeContext} and {@link IterationRuntimeContext}. + * An abstract stub implementation for rich user-defined functions. + * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and + * teardown ({@link #close()}), as well as access to their runtime execution context via + * {@link #getRuntimeContext()}. */ public abstract class AbstractRichFunction implements RichFunction, Serializable { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java index 5c200af..cc8fe78 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; @@ -24,18 +23,41 @@ import java.util.Iterator; import org.apache.flink.util.Collector; - +/** + * The interface for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set + * after a key and then "joining" the groups by calling this function with the two sets for each key. + * If a key is present in only one of the two inputs, it may be that one of the groups is empty. + * <p> + * The basic syntax for using CoGoup on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * <p> + * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked + * with in empty input for the side of the data set that did not contain elements with that specific key. + * + * @param <V1> The data type of the first input data set. + * @param <V2> The data type of the second input data set. + * @param <O> The data type of the returned elements. + */ public interface CoGroupFunction<V1, V2, O> extends Function, Serializable { /** * This method must be implemented to provide a user implementation of a - * coGroup. It is called for each two key-value pairs that share the same - * key and come from different inputs. + * coGroup. It is called for each pair of element groups where the elements share the + * same key. + * + * @param first The records from the first input. + * @param second The records from the second. + * @param out A collector to return elements. * - * @param first The records from the first input which were paired with the key. - * @param second The records from the second input which were paired with the key. - * @param out A collector that collects all output pairs. + * @throws Exception The function may throw Exceptions, which will cause the program to cancel, + * and may trigger the recovery logic. */ void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception; - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java index d72c4c8..aed7fae 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java @@ -16,16 +16,34 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; import java.util.Iterator; /** - * Generic interface used for combiners. + * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction} + * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but + * only a sub-group. + * <p> + * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to + * reduce the data volume earlier, before the entire groups have been collected. + * <p> + * This special variant of the combine function reduces the group of elements into a single element. A variant + * that can return multiple values per group is defined in {@link FlatCombineFunction}. + * + * @param <T> The data type processed by the combine function. */ public interface CombineFunction<T> extends Function, Serializable { - T combine(Iterator<T> records) throws Exception; + /** + * The combine method, called (potentially multiple timed) with subgroups of elements. + * + * @param values The elements to be combined. + * @return The single resulting value. + * + * @throws Exception The function may throw Exceptions, which will cause the program to cancel, + * and may trigger the recovery logic. + */ + T combine(Iterator<T> values) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java index 0c8bc97..5e2c122 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java @@ -16,27 +16,43 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; - /** - * @param <IN1> First input type - * @param <IN2> Second input type - * @param <OUT> Output type + * Interface for Cross functions. Cross functions are applied to the Cartesian produce of their inputs + * and call are called for each pair of elements. + * + * They are optional, a means of convenience the can be used to directly produce manipulate the + * pair of elements, instead of processing 2-tuples that contain the pairs. + * <p> + * The basic syntax for using Cross on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.cross(set2).with(new MyCrossFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. */ public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable { /** - * User defined function for the cross operator. + * Cross UDF method. Called once per pair of elements in the Cartesian product of the inputs. + * + * @param val1 Element from first input. + * @param val2 Element from the second input. + * @return The result element. * - * @param record1 Record from first input - * @param record2 Record from the second input - * @return result of cross UDF. - * @throws Exception + * @throws Exception The function may throw Exceptions, which will cause the program to cancel, + * and may trigger the recovery logic. */ - OUT cross(IN1 record1, IN2 record2) throws Exception; + OUT cross(IN1 val1, IN2 val2) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java deleted file mode 100644 index 9540dc1..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - - -/** - * The execution context provides basic information about the parallel runtime context - * in which a stub instance lives. Such information includes the current number of - * parallel stub instances, the stub's parallel task index, the pact name, or the iteration context. - */ -public interface ExecutionContext { - - /** - * Gets the name of the task. This is the name supplied to the contract upon instantiation. If - * no name was given at contract instantiation time, a default name will be returned. - * - * @return The task's name. - */ - String getTaskName(); - - /** - * Gets the number of parallel subtasks in which the stub is executed. - * - * @return The number of parallel subtasks in which the stub is executed. - */ - int getNumberOfSubtasks(); - - /** - * Gets the subtask's parallel task number. - * - * @return The subtask's parallel task number. - */ - int getSubtaskIndex(); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java index 2f68477..2380f60 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java @@ -16,18 +16,33 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; + import java.io.Serializable; +/** + * base interface for Filter functions. A filter function take elements and evaluates a + * predicate on them to decide whether to keep the element, or to discard it. + * <p> + * The basic syntax for using a FilterFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.filter(new MyFilterFunction()); + * </blockquote></pre> + * + * @param <T> The type of the filtered elements. + */ public interface FilterFunction<T> extends Function, Serializable { /** - * User defined function for a filter. + * The filter function that evaluates the predicate. + * + * @param value The value to be filtered. + * @return True for values that should be retained, false for values to be filtered out. * - * @param value Incoming tuples - * @return true for tuples that are allowed to pass the filter - * @throws Exception + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. */ boolean filter(T value) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java index b2c8f30..edf1614 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; @@ -25,9 +24,28 @@ import java.util.Iterator; import org.apache.flink.util.Collector; /** - * Generic interface used for combiners. + * Generic interface used for combine functions ("combiners"). Combiners act as auxiliaries to a {@link GroupReduceFunction} + * and "pre-reduce" the data. The combine functions typically do not see the entire group of elements, but + * only a sub-group. + * <p> + * Combine functions are frequently helpful in increasing the program efficiency, because they allow the system to + * reduce the data volume earlier, before the entire groups have been collected. + * <p> + * This special variant of the combine function supports to return more than one element per group. + * It is frequently less efficient to use than the {@link CombineFunction}. + * + * @param <T> The data type processed by the combine function. */ public interface FlatCombineFunction<T> extends Function, Serializable { + /** + * The combine method, called (potentially multiple timed) with subgroups of elements. + * + * @param values The elements to be combined. + * @param out The collector to use to return values from the function. + * + * @throws Exception The function may throw Exceptions, which will cause the program to cancel, + * and may trigger the recovery logic. + */ void combine(Iterator<T> values, Collector<T> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java index 6a6b971..3ad2c82 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java @@ -16,15 +16,54 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import org.apache.flink.util.Collector; import java.io.Serializable; - +/** + * Interface for Join functions. Joins combine two data sets by joining their + * elements on specified keys. This function is called with each pair of joining elements. + * <p> + * This particular variant of the join function supports to return zero, one, or more result values + * per pair of joining values. + * <p> + * By default, the joins follows strictly the semantics of an "inner join" in SQL. + * the semantics are those of an "inner join", meaning that elements are filtered out + * if their key is not contained in the other data set. + * <p> + * The basic syntax for using Join on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * <p> + * The Join function is an optional part of a join operation. If no JoinFunction is provided, + * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that + * the JoinFunction would have been invoked with. + * <P> + * Note: You can use a {@link CoGroupFunction} to perform an outer join. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { - void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception; + /** + * The join method, called once per joined pair of elements. + * + * @param first The element from first input. + * @param second The element from second input. + * @param out The collector used to return zero, one, or more elements. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java index a8696cf..0c32eae 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java @@ -16,27 +16,36 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import org.apache.flink.util.Collector; import java.io.Serializable; - /** - * - * @param <T> - * @param <O> + * interface flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. Operations that produce multiple strictly one result element per input element can also + * use the {@link MapFunction}. + * <p> + * The basic syntax for using a FlatMapFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<Y> result = input.flatMap(new MyFlatMapFunction()); + * </blockquote></pre> + * + * @param <T> Type of the input elements. + * @param <O> Type of the returned elements. */ public interface FlatMapFunction<T, O> extends Function, Serializable { /** - * The core method of FlatMappable. Takes an element from the input data set and transforms + * The core method of the FlatMapFunction. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @param value The input value. - * @param out The collector for for emitting result values. + * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java index c2a201f..9e9d88d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java @@ -24,5 +24,4 @@ package org.apache.flink.api.common.functions; * can be called as Java 8 lambdas */ public interface Function { - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java index 41cfa1d..edac4a8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java @@ -16,13 +16,17 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import org.apache.flink.util.Collector; - - +/** + * Variant of the flat map that is used for backwards compatibility in the deprecated Record-API- + * + * @param <T> The input data type. + * @param <O> The result data type. + */ +@Deprecated public interface GenericCollectorMap<T, O> extends RichFunction { void map(T record, Collector<O> out) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java index 984d1fd..5fa959c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; @@ -24,22 +23,35 @@ import java.util.Iterator; import org.apache.flink.util.Collector; - /** - * - * @param <T> Incoming types - * @param <O> Outgoing types + * The interface for group reduce functions. GroupReduceFunctions process groups of elements. + * They may aggregate them to a single value, or produce multiple result values for each group. + * The group may be defined by sharing a common grouping key, or the group may simply be + * all elements of a data set. + * <p> + * For a reduce functions that works incrementally by combining always two elements, see + * {@link ReduceFunction}. + * <p> + * The basic syntax for using a grouped GroupReduceFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction()); + * </blockquote></pre> + * + * @param <T> Type of the elements that this function processes. + * @param <O> The type of the elements returned by the user-defined function. */ public interface GroupReduceFunction<T, O> extends Function, Serializable { + /** + * The reduce method. The function receives one call per group of elements. * - * The central function to be implemented for a reducer. The function receives per call one - * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly - * one function call across all involved instances across all computing nodes. - * - * @param records All records that belong to the given input key. + * @param values All records that belong to the given input key. * @param out The collector to hand results to. - * @throws Exception + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. */ void reduce(Iterator<T> values, Collector<O> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java index 02f526a..9a8943c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java @@ -16,13 +16,49 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import java.io.Serializable; - +/** + * Interface for Join functions. Joins combine two data sets by joining their + * elements on specified keys. This function is called with each pair of joining elements. + * <p> + * By default, the joins follows strictly the semantics of an "inner join" in SQL. + * the semantics are those of an "inner join", meaning that elements are filtered out + * if their key is not contained in the other data set. + * <p> + * The basic syntax for using Join on two data sets is as follows: + * <pre><blockquote> + * DataSet<X> set1 = ...; + * DataSet<Y> set2 = ...; + * + * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction()); + * </blockquote></pre> + * <p> + * {@code set1} is here considered the first input, {@code set2} the second input. + * <p> + * The Join function is an optional part of a join operation. If no JoinFunction is provided, + * the result of the operation is a sequence of 2-tuples, where the elements in the tuple are those that + * the JoinFunction would have been invoked with. + * <P> + * Note: You can use a {@link CoGroupFunction} to perform an outer join. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { + /** + * The join method, called once per joined pair of elements. + * + * @param first The element from first input. + * @param second The element from second input. + * @return The resulting element. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ OUT join(IN1 first, IN2 second) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java index 4e2520d..dccc980 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java @@ -16,16 +16,31 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; - import java.io.Serializable; +/** + * Base interface for Map functions. Map functions take elements and transform them, + * element wise. A Map function always produces a single result element for each input element. + * Typical applications are parsing elements, converting data types, or projecting out fields. + * Operations that produce multiple result elements from a single input element can be implemented + * using the {@link FlatMapFunction}. + * <p> + * The basic syntax for using a MapFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<Y> result = input.map(new MyMapFunction()); + * </blockquote></pre> + * + * @param <T> Type of the input elements. + * @param <O> Type of the returned elements. + */ public interface MapFunction<T, O> extends Function, Serializable { /** - * The core method of Mappable. Takes an element from the input data set and transforms + * The mapping method. Takes an element from the input data set and transforms * it into exactly one element. * * @param value The input value. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java index 04f690a..7a61c97 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java @@ -16,16 +16,36 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; - import java.io.Serializable; +/** + * Base interface for Reduce functions. Reduce functions combine groups of elements to + * a single value, by taking always two elements and combining them into one. Reduce functions + * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced + * individually. + * <p> + * For a reduce functions that work on an entire group at the same time (such as the + * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case, + * ReduceFunctions are considered faster, because they allow the system to use more efficient + * execution strategies. + * <p> + * The basic syntax for using a grouped ReduceFunction is as follows: + * <pre><blockquote> + * DataSet<X> input = ...; + * + * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction()); + * </blockquote></pre> + * <p> + * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * + * @param <T> Type of the elements that this function processes. + */ public interface ReduceFunction<T> extends Function, Serializable { /** - * The core method of Reducible, combining two values into one value of the same type. + * The core method of ReduceFunction, combining two values into one value of the same type. * The reduce function is consecutively applied to all values of a group until only a single value remains. * * @param value1 The first value to combine. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java index ffc3ac2..8f53252 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.functions; import org.apache.flink.configuration.Configuration; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 3cf30ff..e18858b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -105,34 +105,6 @@ public interface RuntimeContext { * Convenience function to create a counter object for histograms. */ Histogram getHistogram(String name); - -// /** -// * I propose to remove this and only keep the other more explicit functions -// * (to add or get an accumulator object) -// * -// * Get an existing or new named accumulator object. Use this function to get -// * an counter for an custom accumulator type. For the integrated -// * accumulators you better use convenience functions (e.g. getIntCounter). -// * -// * There is no need to register accumulators - they will be created when a -// * UDF asks the first time for a counter that does not exist yet locally. -// * This implies that there can be conflicts when a counter is requested with -// * the same name but with different types, either in the same UDF or in -// * different. In the last case the conflict occurs during merging. -// * -// * @param name -// * @param accumulatorClass -// * If the accumulator was not created previously -// * @return -// */ -// <V, A> Accumulator<V, A> getAccumulator(String name, -// Class<? extends Accumulator<V, A>> accumulatorClass); -// -// /** -// * See getAccumulable -// */ -// <T> SimpleAccumulator<T> getSimpleAccumulator(String name, -// Class<? extends SimpleAccumulator<T>> accumulatorClass); // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java index bc4ffd0..6455c77 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java @@ -18,18 +18,15 @@ package org.apache.flink.api.common.functions.util; - import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import java.lang.invoke.SerializedLambda; import java.lang.reflect.Method; public class FunctionUtils { - public static void openFunction (Function function, Configuration parameters) throws Exception{ if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; @@ -61,21 +58,30 @@ public class FunctionUtils { } } - public static boolean isSerializedLambdaFunction(Function function) { - Class<?> clazz = function.getClass(); - try { - Method replaceMethod = clazz.getDeclaredMethod("writeReplace"); - replaceMethod.setAccessible(true); - Object serializedForm = replaceMethod.invoke(function); - if (serializedForm instanceof SerializedLambda) { - return true; + public static boolean isLambdaFunction(Function function) { + if (function == null) { + throw new IllegalArgumentException(); + } + + for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + Method replaceMethod = clazz.getDeclaredMethod("writeReplace"); + replaceMethod.setAccessible(true); + Object serialVersion = replaceMethod.invoke(function); + + if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) { + return true; + } } - else { - return false; + catch (NoSuchMethodException e) { + // thrown if the method is not there. fall through the loop + } + catch (Throwable t) { + // this should not happen, we are not executing any method code. + throw new RuntimeException("Error while checking whether function is a lambda.", t); } } - catch (Exception e) { - return false; - } + + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java index c416765..6761c17 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators; import org.apache.flink.api.common.functions.AbstractRichFunction; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java index 66bea7f..c91e6ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java @@ -48,6 +48,7 @@ import org.apache.flink.util.Visitor; /** * */ +@SuppressWarnings("deprecation") public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator { private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>"; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index ef00a46..6909dd4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.GenericCollectorMap; @@ -33,6 +32,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; * * @see GenericCollectorMap */ +@Deprecated public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java index 7d6495b..2683583 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java @@ -48,6 +48,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; /** * Convenience methods when dealing with {@link Operator}s. */ +@SuppressWarnings("deprecation") public class OperatorUtil { @SuppressWarnings("rawtypes") @@ -126,7 +127,7 @@ public class OperatorUtil { * @param inputs * all input contracts to this contract */ - @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes", "unchecked" }) public static void setInputs(final Operator<?> contract, final List<List<Operator>> inputs) { if (contract instanceof GenericDataSinkBase) { if (inputs.size() != 1) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java index 30091ab..88809bb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java @@ -42,6 +42,7 @@ import org.junit.Test; /** * Tests {@link OperatorUtil}. */ +@SuppressWarnings("deprecation") public class OperatorUtilTest { /** * Test {@link OperatorUtil#getContractClass(Class)} @@ -115,13 +116,17 @@ public class OperatorUtilTest { assertEquals(GenericDataSourceBase.class, result); } + @SuppressWarnings("serial") static abstract class CoGrouper implements CoGroupFunction<IntValue, IntValue, IntValue> {} + @SuppressWarnings("serial") static abstract class Crosser implements CrossFunction<IntValue, IntValue, IntValue> {} static abstract class Mapper implements GenericCollectorMap<IntValue, IntValue> {} + @SuppressWarnings("serial") static abstract class Matcher implements FlatJoinFunction<IntValue, IntValue, IntValue> {} + @SuppressWarnings("serial") static abstract class Reducer implements GroupReduceFunction<IntValue, IntValue> {} } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index e7199f9..3d1238a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -50,9 +50,9 @@ import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.ProjectOperator.Projection; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.operators.ReduceOperator; +import org.apache.flink.api.java.operators.SortedGrouping; import org.apache.flink.api.java.operators.UnionOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.api.java.record.functions.CrossFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -65,8 +65,8 @@ import org.apache.flink.types.TypeInformation; * A DataSet represents a collection of elements of the same type.<br/> * A DataSet can be transformed into another DataSet by applying a transformation as for example * <ul> - * <li>{@link DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li> - * <li>{@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li> + * <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li> + * <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li> * <li>{@link DataSet#join(DataSet)}, or</li> * <li>{@link DataSet#coGroup(DataSet)}.</li> * </ul> @@ -135,7 +135,7 @@ public abstract class DataSet<T> { if (mapper == null) { throw new NullPointerException("Map function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(mapper)) { + if (FunctionUtils.isLambdaFunction(mapper)) { throw new UnsupportedLambdaExpressionException(); } return new MapOperator<T, R>(this, mapper); @@ -157,7 +157,7 @@ public abstract class DataSet<T> { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) { + if (FunctionUtils.isLambdaFunction(flatMapper)) { throw new UnsupportedLambdaExpressionException(); } return new FlatMapOperator<T, R>(this, flatMapper); @@ -197,13 +197,13 @@ public abstract class DataSet<T> { * * @param fieldIndexes The field indexes of the input tuples that are retained. * The order of fields in the output tuple corresponds to the order of field indexes. - * @return A Projection that needs to be converted into a {@link ProjectOperator} to complete the + * @return A Projection that needs to be converted into a {@link org.apache.flink.api.java.operators.ProjectOperator} to complete the * Project transformation by calling {@link Projection#types()}. * * @see Tuple * @see DataSet * @see Projection - * @see ProjectOperator + * @see org.apache.flink.api.java.operators.ProjectOperator */ public Projection<T> project(int... fieldIndexes) { return new Projection<T>(this, fieldIndexes); @@ -303,7 +303,7 @@ public abstract class DataSet<T> { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } return new GroupReduceOperator<T, R>(this, reducer); @@ -366,17 +366,15 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. * @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet. * * @see KeySelector - * @see Grouping * @see UnsortedGrouping - * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator * @see org.apache.flink.api.java.operators.GroupReduceOperator @@ -395,17 +393,15 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param fields One or more field positions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. * * @see Tuple - * @see Grouping * @see UnsortedGrouping - * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator * @see org.apache.flink.api.java.operators.GroupReduceOperator @@ -424,17 +420,15 @@ public abstract class DataSet<T> { * <ul> * <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. * <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation. - * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} to apply a Reduce transformation. - * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)} to apply a GroupReduce transformation. + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * </ul> * * @param fields One or more field expressions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. * * @see Tuple - * @see Grouping * @see UnsortedGrouping - * @see SortedGrouping * @see AggregateOperator * @see ReduceOperator * @see org.apache.flink.api.java.operators.GroupReduceOperator @@ -462,7 +456,6 @@ public abstract class DataSet<T> { * @return A JoinOperatorSets to continue the definition of the Join transformation. * * @see JoinOperatorSets - * @see JoinOperator * @see DataSet */ public <R> JoinOperatorSets<T, R> join(DataSet<R> other) { @@ -484,7 +477,6 @@ public abstract class DataSet<T> { * @return A JoinOperatorSets to continue the definition of the Join transformation. * * @see JoinOperatorSets - * @see JoinOperator * @see DataSet */ public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) { @@ -506,7 +498,6 @@ public abstract class DataSet<T> { * @return A JoinOperatorSet to continue the definition of the Join transformation. * * @see JoinOperatorSets - * @see JoinOperator * @see DataSet */ public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) { @@ -570,7 +561,7 @@ public abstract class DataSet<T> { * second input being the second field of the tuple. * * <p> - * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for + * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)} to define a {@link CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br> * * @param other The other DataSet with which this DataSet is crossed. @@ -599,14 +590,15 @@ public abstract class DataSet<T> { * second input being the second field of the tuple. * * <p> - * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for + * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a + * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br> * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. * * @see DefaultCross - * @see CrossFunction + * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet * @see Tuple2 */ @@ -628,14 +620,15 @@ public abstract class DataSet<T> { * second input being the second field of the tuple. * * <p> - * Call {@link DefaultCross.with(CrossFunction)} to define a {@link CrossFunction} which is called for + * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a + * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br> * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. * * @see DefaultCross - * @see CrossFunction + * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet * @see Tuple2 */ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index ebd1422..af0ea52 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment { * individually override this value to use a specific degree of parallelism via * {@link Operator#setParallelism(int)}. Other operations may need to run with a different * degree of parallelism - for example calling - * {@link DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over the entire + * {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire * set will insert eventually an operation that runs non-parallel (degree of parallelism of one). * * @return The degree of parallelism used by operations, unless they override that value. This method http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java index 012ab57..0a83235 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java @@ -23,7 +23,7 @@ import java.util.Iterator; import org.apache.flink.util.Collector; /** - * A variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then + * A convenience variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then * through a collector. In all other respects, it behaves exactly like the FlatMapFunction. * <p> * The function needs to be serializable, as defined in {@link java.io.Serializable}. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java index 8aaaf86..e78c31e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java @@ -22,29 +22,14 @@ import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** - * The abstract base class for CoGroup functions. CoGroup functions combine two data sets by first grouping each data set - * after a key and then "joining" the groups by calling this function with the two sets for each key. - * If a key is present in only one of the two inputs, it may be that one of the groups is empty. - * <p> - * The basic syntax for using CoGoup on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * The keys can be defined through tuple field positions or key extractors. - * See {@link org.apache.flink.api.java.operators.Keys} for details. - * <p> - * Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked - * with in empty input for the side of the data set that did not contain elements with that specific key. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN1> The type of the elements in the first input. * @param <IN2> The type of the elements in the second input. @@ -54,20 +39,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFun private static final long serialVersionUID = 1L; - - /** - * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same - * key. The elements of the groups are returned by the respective iterators. - * - * It is possible that one of the two groups is empty, in which case the respective iterator has no elements. - * - * @param first The group from the first input. - * @param second The group from the second input. - * @param out The collector through which to return the result elements. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java index a4e1248..58be279 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java @@ -20,26 +20,13 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.CrossFunction; - +import org.apache.flink.api.common.functions.RichFunction; /** - * The abstract base class for Cross functions. Cross functions build a Cartesian produce of their inputs - * and call the function or each pair of elements. - * They are a means of convenience and can be used to directly produce manipulate the - * pair of elements, instead of having the operator build 2-tuples, and then using a - * MapFunction over those 2-tuples. - * <p> - * The basic syntax for using Cross on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.cross(set2).with(new MyCrossFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN1> The type of the elements in the first input. * @param <IN2> The type of the elements in the second input. @@ -48,19 +35,7 @@ import org.apache.flink.api.common.functions.CrossFunction; public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> { private static final long serialVersionUID = 1L; - - /** - * The core method of the cross operation. The method will be invoked for each pair of elements - * in the Cartesian product. - * - * @param first The element from the first input. - * @param second The element from the second input. - * @return The result element. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract OUT cross(IN1 first, IN2 second) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java index e3baa74..9057a0f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java @@ -20,19 +20,13 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichFunction; /** - * The abstract base class for Filter functions. A filter function take elements and evaluates a - * predicate on them to decide whether to keep the element, or to discard it. - * <p> - * The basic syntax for using a FilterFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.filter(new MyFilterFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the FilterFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <T> The type of the filtered elements. */ @@ -40,18 +34,6 @@ public abstract class RichFilterFunction<T> extends AbstractRichFunction impleme private static final long serialVersionUID = 1L; - /** - * The core method of the FilterFunction. The method is called for each element in the input, - * and determines whether the element should be kept or filtered out. If the method returns true, - * the element passes the filter and is kept, if the method returns false, the element is - * filtered out. - * - * @param value The input value to be filtered. - * @return Flag to indicate whether to keep the value (true) or to discard it (false). - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract boolean filter(T value) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java index 8c326c6..97c15ff 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java @@ -21,10 +21,19 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; import java.util.Iterator; +/** + * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param <T> The data type of the elements to be combined. + */ public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java index 15b4539..6918364 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java @@ -20,36 +20,14 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** - * The abstract base class for Join functions. Join functions combine two data sets by joining their - * elements on specified keys and calling this function with each pair of joining elements. - * By default, this follows strictly the semantics of an "inner join" in SQL. - * the semantics are those of an "inner join", meaning that elements are filtered out - * if their key is not contained in the other data set. - * <p> - * Per the semantics of an inner join, the function is - * <p> - * The basic syntax for using Join on two data sets is as follows: - * <pre><blockquote> - * DataSet<X> set1 = ...; - * DataSet<Y> set2 = ...; - * - * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyJoinFunction()); - * </blockquote></pre> - * <p> - * {@code set1} is here considered the first input, {@code set2} the second input. - * The keys can be defined through tuple field positions or key extractors. - * See {@link org.apache.flink.api.java.operators.Keys} for details. - * <p> - * The Join function is actually not a necessary part of a join operation. If no JoinFunction is provided, - * the result of the operation is a sequence of Tuple2, where the elements in the tuple are those that - * the JoinFunction would have been invoked with. - * <P> - * Note: You can use a {@link RichCoGroupFunction} to perform an outer join. - * <p> - * All functions need to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN1> The type of the elements in the first input. * @param <IN2> The type of the elements in the second input. @@ -59,17 +37,6 @@ public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFu private static final long serialVersionUID = 1L; - /** - * The user-defined method for performing transformations after a join. - * The method is called with matching pairs of elements from the inputs. - * - * @param first The element from first input. - * @param second The element from second input. - * @return The resulting element. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception; }
