[FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22b24f20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22b24f20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22b24f20 Branch: refs/heads/release-0.6 Commit: 22b24f208de2501311fe504e767526690742e30e Parents: 9d1c49f Author: Kostas Tzoumas <[email protected]> Authored: Fri Jul 18 15:37:19 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 31 17:47:35 2014 +0200 ---------------------------------------------------------------------- .../flink/api/avro/AvroOutputFormatTest.java | 6 +- .../avro/testjar/AvroExternalJarProgram.java | 8 +- .../mapred/example/WordCount.java | 8 +- .../mapreduce/example/WordCount.java | 8 +- .../spargel/java/VertexCentricIteration.java | 8 +- .../examples/SpargelConnectedComponents.java | 4 +- .../spargel/java/examples/SpargelPageRank.java | 8 +- .../SpargelPageRankCountingVertices.java | 14 +- .../SpargelConnectedComponentsITCase.java | 4 +- .../apache/flink/client/testjar/WordCount.java | 4 +- .../flink/compiler/util/NoOpFunction.java | 4 +- .../compiler/BranchingPlansCompilerTest.java | 5 +- .../CachedMatchStrategyCompilerTest.java | 5 +- .../compiler/CoGroupSolutionSetFirstTest.java | 9 +- .../apache/flink/compiler/CompilerTestBase.java | 8 +- .../compiler/FeedbackPropertiesMatchTest.java | 10 +- .../compiler/GroupReduceCompilationTest.java | 20 +- .../flink/compiler/IterationsCompilerTest.java | 20 +- .../flink/compiler/ReduceCompilationTest.java | 10 +- .../compiler/UnionPropertyPropagationTest.java | 4 +- .../WorksetIterationsJavaApiCompilerTest.java | 18 +- .../testfunctions/DummyFlatJoinFunction.java | 33 ++ .../testfunctions/DummyJoinFunction.java | 32 - .../testfunctions/IdentityGroupReducer.java | 6 +- .../compiler/testfunctions/IdentityMapper.java | 4 +- .../testfunctions/SelectOneReducer.java | 4 +- .../testfunctions/Top1GroupReducer.java | 6 +- .../flink/compiler/util/DummyCrossStub.java | 9 +- .../api/common/functions/AbstractFunction.java | 76 --- .../common/functions/AbstractRichFunction.java | 76 +++ .../api/common/functions/CoGroupFunction.java | 41 ++ .../api/common/functions/CombineFunction.java | 31 + .../api/common/functions/CrossFunction.java | 42 ++ .../api/common/functions/FilterFunction.java | 33 ++ .../common/functions/FlatCombineFunction.java | 33 ++ .../api/common/functions/FlatJoinFunction.java | 30 + .../api/common/functions/FlatMapFunction.java | 45 ++ .../flink/api/common/functions/Function.java | 75 +-- .../api/common/functions/GenericCoGrouper.java | 40 -- .../common/functions/GenericCollectorMap.java | 2 +- .../api/common/functions/GenericCombine.java | 32 - .../api/common/functions/GenericCrosser.java | 41 -- .../api/common/functions/GenericFilter.java | 33 -- .../api/common/functions/GenericFlatMap.java | 42 -- .../common/functions/GenericGroupReduce.java | 44 -- .../api/common/functions/GenericJoiner.java | 28 - .../flink/api/common/functions/GenericMap.java | 30 - .../api/common/functions/GenericReduce.java | 26 - .../common/functions/GroupReduceFunction.java | 45 ++ .../api/common/functions/JoinFunction.java | 28 + .../flink/api/common/functions/MapFunction.java | 38 ++ .../api/common/functions/ReduceFunction.java | 39 ++ .../api/common/functions/RichFunction.java | 97 ++++ .../api/common/functions/RuntimeContext.java | 2 +- .../common/functions/util/FunctionUtils.java | 81 +++ .../flink/api/common/operators/Union.java | 6 +- .../operators/base/BulkIterationBase.java | 8 +- .../operators/base/CoGroupOperatorBase.java | 6 +- .../operators/base/CrossOperatorBase.java | 6 +- .../operators/base/DeltaIterationBase.java | 6 +- .../operators/base/FilterOperatorBase.java | 6 +- .../operators/base/FlatMapOperatorBase.java | 6 +- .../operators/base/GroupReduceOperatorBase.java | 17 +- .../common/operators/base/JoinOperatorBase.java | 6 +- .../common/operators/base/MapOperatorBase.java | 4 +- .../operators/base/ReduceOperatorBase.java | 6 +- .../api/common/operators/util/OperatorUtil.java | 18 +- .../common/operators/util/OperatorUtilTest.java | 21 +- flink-examples/flink-java-examples/pom.xml | 1 - .../flink/example/java/clustering/KMeans.java | 17 +- .../example/java/graph/ConnectedComponents.java | 24 +- .../example/java/graph/EnumTrianglesBasic.java | 14 +- .../example/java/graph/EnumTrianglesOpt.java | 28 +- .../flink/example/java/graph/PageRankBasic.java | 18 +- .../java/graph/TransitiveClosureNaive.java | 4 +- .../flink/example/java/ml/LinearRegression.java | 15 +- .../relational/EmptyFieldsCountAccumulator.java | 4 +- .../java/relational/RelationalQuery.java | 2 +- .../example/java/relational/TPCHQuery10.java | 4 +- .../example/java/relational/TPCHQuery3.java | 4 +- .../example/java/relational/WebLogAnalysis.java | 12 +- .../flink/example/java/wordcount/WordCount.java | 4 +- .../example/java/wordcount/WordCountPOJO.java | 6 +- flink-examples/pom.xml | 1 - .../java/org/apache/flink/api/java/DataSet.java | 93 +-- .../apache/flink/api/java/DeltaIteration.java | 2 +- .../flink/api/java/ExecutionEnvironment.java | 6 +- .../apache/flink/api/java/IterativeDataSet.java | 2 +- .../api/java/functions/CoGroupFunction.java | 74 --- .../flink/api/java/functions/CrossFunction.java | 76 --- .../api/java/functions/FilterFunction.java | 57 -- .../api/java/functions/FlatMapFunction.java | 59 -- .../api/java/functions/FlatMapIterator.java | 4 +- .../api/java/functions/FunctionAnnotation.java | 16 +- .../api/java/functions/GroupReduceFunction.java | 114 ---- .../api/java/functions/GroupReduceIterator.java | 2 +- .../flink/api/java/functions/JoinFunction.java | 93 --- .../flink/api/java/functions/MapFunction.java | 59 -- .../api/java/functions/ReduceFunction.java | 63 -- .../api/java/functions/RichCoGroupFunction.java | 74 +++ .../api/java/functions/RichCrossFunction.java | 67 +++ .../api/java/functions/RichFilterFunction.java | 57 ++ .../java/functions/RichFlatCombineFunction.java | 34 ++ .../java/functions/RichFlatJoinFunction.java | 75 +++ .../api/java/functions/RichFlatMapFunction.java | 59 ++ .../java/functions/RichGroupReduceFunction.java | 114 ++++ .../api/java/functions/RichJoinFunction.java | 31 + .../api/java/functions/RichMapFunction.java | 59 ++ .../api/java/functions/RichReduceFunction.java | 63 ++ .../UnsupportedLambdaExpressionException.java | 30 + .../api/java/operators/AggregateOperator.java | 20 +- .../api/java/operators/CoGroupOperator.java | 52 +- .../flink/api/java/operators/CrossOperator.java | 18 +- .../api/java/operators/DistinctOperator.java | 44 +- .../api/java/operators/FilterOperator.java | 6 +- .../api/java/operators/FlatMapOperator.java | 7 +- .../api/java/operators/GroupReduceOperator.java | 224 +++++++ .../flink/api/java/operators/Grouping.java | 4 +- .../flink/api/java/operators/JoinOperator.java | 163 ++++-- .../flink/api/java/operators/MapOperator.java | 10 +- .../api/java/operators/ProjectOperator.java | 4 +- .../api/java/operators/ReduceGroupOperator.java | 215 ------- .../api/java/operators/ReduceOperator.java | 19 +- .../java/operators/SingleInputUdfOperator.java | 4 +- .../api/java/operators/SortedGrouping.java | 20 +- .../api/java/operators/TwoInputUdfOperator.java | 4 +- .../flink/api/java/operators/UdfOperator.java | 10 +- .../api/java/operators/UnsortedGrouping.java | 24 +- .../translation/KeyExtractingMapper.java | 4 +- .../translation/KeyRemovingMapper.java | 4 +- .../translation/PlanFilterOperator.java | 8 +- .../translation/PlanProjectOperator.java | 10 +- .../PlanUnwrappingCoGroupOperator.java | 13 +- .../translation/PlanUnwrappingJoinOperator.java | 28 +- .../PlanUnwrappingReduceGroupOperator.java | 21 +- .../PlanUnwrappingReduceOperator.java | 7 +- .../translation/TupleKeyExtractingMapper.java | 4 +- .../operators/translation/WrappingFunction.java | 22 +- .../java/record/functions/CoGroupFunction.java | 5 +- .../java/record/functions/CrossFunction.java | 19 +- .../api/java/record/functions/JoinFunction.java | 8 +- .../api/java/record/functions/MapFunction.java | 4 +- .../java/record/functions/ReduceFunction.java | 8 +- .../flink/api/java/typeutils/TypeExtractor.java | 66 ++- .../java/typeutils/runtime/TupleComparator.java | 2 +- .../SemanticPropertiesTranslationTest.java | 12 +- .../DeltaIterationTranslationTest.java | 26 +- .../translation/ReduceTranslationTests.java | 10 +- .../java/type/extractor/TypeExtractorTest.java | 126 ++-- flink-java8-tests/pom.xml | 145 +++++ .../javaApiOperators/lambdas/CoGroupITCase.java | 70 +++ .../javaApiOperators/lambdas/CrossITCase.java | 59 ++ .../javaApiOperators/lambdas/FilterITCase.java | 142 +++++ .../lambdas/FlatJoinITCase.java | 58 ++ .../javaApiOperators/lambdas/FlatMapITCase.java | 46 ++ .../lambdas/GroupReduceITCase.java | 84 +++ .../javaApiOperators/lambdas/JoinITCase.java | 58 ++ .../javaApiOperators/lambdas/MapITCase.java | 48 ++ .../javaApiOperators/lambdas/ReduceITCase.java | 160 +++++ .../task/AbstractIterativePactTask.java | 4 +- .../iterative/task/IterationHeadPactTask.java | 4 +- .../task/IterationIntermediatePactTask.java | 4 +- .../iterative/task/IterationTailPactTask.java | 4 +- .../AbstractCachedBuildSideMatchDriver.java | 6 +- .../runtime/operators/AllGroupReduceDriver.java | 24 +- .../runtime/operators/AllReduceDriver.java | 16 +- .../flink/runtime/operators/CoGroupDriver.java | 14 +- .../CoGroupWithSolutionSetFirstDriver.java | 14 +- .../CoGroupWithSolutionSetSecondDriver.java | 14 +- .../flink/runtime/operators/CrossDriver.java | 32 +- .../flink/runtime/operators/FlatMapDriver.java | 16 +- .../operators/GroupReduceCombineDriver.java | 14 +- .../runtime/operators/GroupReduceDriver.java | 16 +- .../JoinWithSolutionSetFirstDriver.java | 14 +- .../JoinWithSolutionSetSecondDriver.java | 14 +- .../flink/runtime/operators/MapDriver.java | 16 +- .../flink/runtime/operators/MatchDriver.java | 16 +- .../flink/runtime/operators/NoOpDriver.java | 10 +- .../runtime/operators/ReduceCombineDriver.java | 16 +- .../flink/runtime/operators/ReduceDriver.java | 16 +- .../runtime/operators/RegularPactTask.java | 42 +- .../chaining/ChainedCollectorMapDriver.java | 4 +- .../chaining/ChainedFlatMapDriver.java | 17 +- .../operators/chaining/ChainedMapDriver.java | 14 +- .../ChainedTerminationCriterionDriver.java | 4 +- .../SynchronousChainedCombineDriver.java | 13 +- .../hash/BuildFirstHashMatchIterator.java | 4 +- .../hash/BuildSecondHashMatchIterator.java | 4 +- .../sort/CombiningUnilateralSortMerger.java | 19 +- .../operators/sort/MergeMatchIterator.java | 14 +- .../operators/util/JoinTaskIterator.java | 4 +- .../runtime/operators/CachedMatchTaskTest.java | 7 +- .../operators/CoGroupTaskExternalITCase.java | 9 +- .../runtime/operators/CoGroupTaskTest.java | 11 +- .../operators/CombineTaskExternalITCase.java | 6 +- .../runtime/operators/CombineTaskTest.java | 6 +- .../operators/CrossTaskExternalITCase.java | 6 +- .../flink/runtime/operators/CrossTaskTest.java | 22 +- .../operators/MatchTaskExternalITCase.java | 6 +- .../flink/runtime/operators/MatchTaskTest.java | 14 +- .../operators/ReduceTaskExternalITCase.java | 6 +- .../flink/runtime/operators/ReduceTaskTest.java | 6 +- .../drivers/AllGroupReduceDriverTest.java | 20 +- .../operators/drivers/AllReduceDriverTest.java | 32 +- .../drivers/GroupReduceDriverTest.java | 20 +- .../drivers/ReduceCombineDriverTest.java | 32 +- .../operators/drivers/ReduceDriverTest.java | 32 +- .../operators/hash/HashMatchIteratorITCase.java | 18 +- .../sort/SortMergeMatchIteratorITCase.java | 4 +- .../operators/testutils/DriverTestBase.java | 8 +- .../operators/testutils/TaskTestBase.java | 6 +- .../operators/util/HashVsSortMiniBenchmark.java | 4 +- .../api/scala/functions/CrossFunction.scala | 28 +- .../api/scala/operators/CrossOperator.scala | 69 +-- .../api/scala/operators/IterateOperators.scala | 2 +- .../test/compiler/util/CompilerTestBase.java | 4 +- .../test/compiler/util/OperatorResolver.java | 4 +- flink-tests/pom.xml | 2 +- ...ultipleJoinsWithSolutionSetCompilerTest.java | 14 +- .../BulkIterationWithAllReducerITCase.java | 4 +- .../CoGroupConnectedComponentsSecondITCase.java | 12 +- .../DependencyConnectedComponentsITCase.java | 23 +- .../aggregators/AggregatorsITCase.java | 20 +- ...nentsWithParametrizableAggregatorITCase.java | 15 +- ...entsWithParametrizableConvergenceITCase.java | 15 +- .../CustomCompensatableDotProductCoGroup.java | 6 +- .../CustomCompensatableDotProductMatch.java | 8 +- .../CustomCompensatingMap.java | 4 +- .../CustomRankCombiner.java | 10 +- .../test/javaApiOperators/CoGroupITCase.java | 17 +- .../test/javaApiOperators/CrossITCase.java | 15 +- .../test/javaApiOperators/DistinctITCase.java | 4 +- .../test/javaApiOperators/FilterITCase.java | 9 +- .../test/javaApiOperators/FlatMapITCase.java | 5 +- .../javaApiOperators/GroupReduceITCase.java | 578 ++++++++++--------- .../flink/test/javaApiOperators/JoinITCase.java | 105 ++-- .../flink/test/javaApiOperators/MapITCase.java | 7 +- .../test/javaApiOperators/ReduceITCase.java | 19 +- .../test/javaApiOperators/UnionITCase.java | 4 +- .../flink/test/operators/CrossITCase.java | 49 +- .../recordJobs/kmeans/udfs/ComputeDistance.java | 16 +- .../test/recordJobs/relational/TPCHQuery3.java | 2 +- .../flink/test/util/testjar/KMeansForTest.java | 16 +- pom.xml | 3 + 244 files changed, 4106 insertions(+), 2872 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java index 637a5e9..386f318 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java @@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.io.AvroOutputFormat; import org.apache.flink.api.java.record.io.avro.example.User; import org.apache.flink.api.java.tuple.Tuple3; @@ -125,7 +125,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase { } - public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> { + public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { @Override public User map(Tuple3<String, Integer, String> value) throws Exception { @@ -133,7 +133,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase { } } - public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> { + public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { @Override public ReflectiveUser map(User value) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java index 146c72b..75b7da6 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java @@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.flink.api.avro.AvroBaseValue; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -141,7 +141,7 @@ public class AvroExternalJarProgram { // -------------------------------------------------------------------------------------------- - public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> { + public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { private static final long serialVersionUID = 1L; @Override @@ -151,7 +151,7 @@ public class AvroExternalJarProgram { } } - public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> { + public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java index 4e8ffa9..ba09e77 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java @@ -18,6 +18,7 @@ package org.apache.flink.hadoopcompatibility.mapred.example; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -28,8 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat; @@ -88,7 +88,7 @@ public class WordCount { /** * Splits a line into words and converts Hadoop Writables into normal Java data types. */ - public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { + public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { @Override public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { @@ -108,7 +108,7 @@ public class WordCount { /** * Converts Java data types to Hadoop Writables. */ - public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { + public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { @Override public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java index 36ea378..c00a14a 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java @@ -19,8 +19,8 @@ package org.apache.flink.hadoopcompatibility.mapreduce.example; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.hadoop.fs.Path; @@ -89,7 +89,7 @@ public class WordCount { /** * Splits a line into words and converts Hadoop Writables into normal Java data types. */ - public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { + public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { @Override public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { @@ -109,7 +109,7 @@ public class WordCount { /** * Converts Java data types to Hadoop Writables. */ - public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { + public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { @Override public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java index bb84cea..65be2f8 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java @@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; -import org.apache.flink.api.java.functions.CoGroupFunction; +import org.apache.flink.api.java.functions.RichCoGroupFunction; import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.tuple.Tuple2; @@ -393,7 +393,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver // -------------------------------------------------------------------------------------------- private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> - extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> + extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>> { private static final long serialVersionUID = 1L; @@ -463,7 +463,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver * UDF that encapsulates the message sending function for graphs where the edges have no associated values. */ private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> - extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> + extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>> { private static final long serialVersionUID = 1L; @@ -516,7 +516,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver * UDF that encapsulates the message sending function for graphs where the edges have an associated value. */ private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> - extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> + extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java index ea90feb..a4ba6fa 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java @@ -18,7 +18,7 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.spargel.java.MessageIterator; import org.apache.flink.spargel.java.MessagingFunction; @@ -70,7 +70,7 @@ public class SpargelConnectedComponents { * A map function that takes a Long value and creates a 2-tuple out of it: * <pre>(Long value) -> (value, value)</pre> */ - public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> { + public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> map(Long value) { return new Tuple2<Long, Long>(value, value); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java index c7fbaaa..9dfc327 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java @@ -18,8 +18,8 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.spargel.java.MessageIterator; @@ -48,7 +48,7 @@ public class SpargelPageRank { // enumerate some sample edges and assign an initial uniform probability (rank) DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices) - .map(new MapFunction<Long, Tuple2<Long, Double>>() { + .map(new RichMapFunction<Long, Tuple2<Long, Double>>() { public Tuple2<Long, Double> map(Long value) { return new Tuple2<Long, Double>(value, 1.0/numVertices); } @@ -56,7 +56,7 @@ public class SpargelPageRank { // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices) - .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() { + .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() { public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) { int numOutEdges = (int) (Math.random() * (numVertices / 2)); for (int i = 0; i < numOutEdges; i++) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java index 34c9ad8..43c0b84 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java @@ -18,9 +18,9 @@ package org.apache.flink.spargel.java.examples; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.MapFunction; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; @@ -53,7 +53,7 @@ public class SpargelPageRankCountingVertices { // generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES) - .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() { + .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() { public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) { int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2)); for (int i = 0; i < numOutEdges; i++) { @@ -67,12 +67,12 @@ public class SpargelPageRankCountingVertices { // count the number of vertices DataSet<Long> count = vertices - .map(new MapFunction<Long, Long>() { + .map(new RichMapFunction<Long, Long>() { public Long map(Long value) { return 1L; } }) - .reduce(new ReduceFunction<Long>() { + .reduce(new RichReduceFunction<Long>() { public Long reduce(Long value1, Long value2) { return value1 + value2; } @@ -80,7 +80,7 @@ public class SpargelPageRankCountingVertices { // enumerate some sample edges and assign an initial uniform probability (rank) DataSet<Tuple2<Long, Double>> intialRanks = vertices - .map(new MapFunction<Long, Tuple2<Long, Double>>() { + .map(new RichMapFunction<Long, Tuple2<Long, Double>>() { private long numVertices; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java index a34f2db..16b004c 100644 --- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java +++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java @@ -21,7 +21,7 @@ package org.apache.flink.test.spargel; import java.io.BufferedReader; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -72,7 +72,7 @@ public class SpargelConnectedComponentsITCase extends JavaProgramTestBase { } } - public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> { + public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> { public Tuple2<Long, Long> map(String value) { String[] nums = value.split(" "); return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1])); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java index 7320b7b..2b64b84 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java @@ -19,7 +19,7 @@ package org.apache.flink.client.testjar; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -75,7 +75,7 @@ public class WordCount { * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). */ - public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> { + public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java index 9f10be5..6eccc8a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java @@ -18,9 +18,9 @@ package org.apache.flink.compiler.util; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; -public class NoOpFunction extends AbstractFunction { +public class NoOpFunction extends AbstractRichFunction { private static final long serialVersionUID = 1L; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java index 571f4e4..10fa34f 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java @@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; import org.apache.flink.api.java.record.operators.BulkIteration; import org.apache.flink.api.java.record.operators.CoGroupOperator; import org.apache.flink.api.java.record.operators.CrossOperator; @@ -40,7 +40,6 @@ import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.JoinOperator; import org.apache.flink.api.java.record.operators.MapOperator; import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.plan.OptimizedPlan; import org.apache.flink.compiler.plan.SinkPlanNode; import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; @@ -959,7 +958,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { result1.join(result2) .where(new IdentityKeyExtractor<String>()) .equalTo(new IdentityKeyExtractor<String>()) - .with(new JoinFunction<String, String, String>() { + .with(new RichJoinFunction<String, String, String>() { @Override public String join(String first, String second) { return null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java index 55c53f0..8226dbf 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java @@ -28,9 +28,8 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.dag.TempMode; import org.apache.flink.compiler.plan.DualInputPlanNode; import org.apache.flink.compiler.plan.OptimizedPlan; @@ -256,7 +255,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { } - private static class DummyJoiner extends JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> { + private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> { @Override public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java index 1c30545..3624d86 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java @@ -21,16 +21,15 @@ package org.apache.flink.compiler; import java.util.Iterator; +import org.apache.flink.api.java.functions.RichCoGroupFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.junit.Assert; import org.junit.Test; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.CoGroupFunction; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.Channel; import org.apache.flink.compiler.plan.DualInputPlanNode; import org.apache.flink.compiler.plan.OptimizedPlan; @@ -43,13 +42,13 @@ import org.apache.flink.util.Visitor; @SuppressWarnings("serial") public class CoGroupSolutionSetFirstTest extends CompilerTestBase { - public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> { + public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> { @Override public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception { } } - public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> { + public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> { @Override public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java index ff4d6b0..b2c163b 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java @@ -27,14 +27,12 @@ import java.util.Map; import java.util.Set; import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.base.GenericDataSourceBase; import org.apache.flink.api.java.record.operators.BulkIteration; import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.compiler.DataStatistics; -import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.costs.DefaultCostEstimator; import org.apache.flink.compiler.plan.OptimizedPlan; import org.apache.flink.compiler.plan.PlanNode; @@ -181,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { } @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) { + public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) { List<PlanNode> nodes = this.map.get(name); if (nodes == null || nodes.isEmpty()) { throw new RuntimeException("No node found with the given name and stub class."); @@ -243,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { } @SuppressWarnings("unchecked") - public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) { + public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) { List<Operator<?>> nodes = this.map.get(name); if (nodes == null || nodes.isEmpty()) { throw new RuntimeException("No node found with the given name and stub class."); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java index d50a7d6..0fbf072 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java @@ -22,8 +22,8 @@ package org.apache.flink.compiler; import static org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*; import static org.junit.Assert.*; -import org.apache.flink.api.common.functions.GenericJoiner; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.OperatorInformation; import org.apache.flink.api.common.operators.Order; @@ -47,7 +47,7 @@ import org.apache.flink.compiler.plan.DualInputPlanNode; import org.apache.flink.compiler.plan.SingleInputPlanNode; import org.apache.flink.compiler.plan.SourcePlanNode; import org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport; -import org.apache.flink.compiler.testfunctions.DummyJoinFunction; +import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction; import org.apache.flink.compiler.testfunctions.IdentityMapper; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.operators.DriverStrategy; @@ -1426,10 +1426,10 @@ public class FeedbackPropertiesMatchTest { } private static final MapNode getMapNode() { - return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op")); + return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op")); } private static final MatchNode getJoinNode() { - return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op")); + return new MatchNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op")); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java index f3e513a..b9cc769 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java @@ -23,9 +23,9 @@ import java.util.Iterator; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.functions.GroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.ReduceGroupOperator; +import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.util.Collector; @@ -50,7 +50,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); - data.reduceGroup(new GroupReduceFunction<Double, Double>() { + data.reduceGroup(new RichGroupReduceFunction<Double, Double>() { public void reduce(Iterator<Double> values, Collector<Double> out) {} }).name("reducer") .print().name("sink"); @@ -94,7 +94,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); - ReduceGroupOperator<Long, Long> reduced = data.reduceGroup(new GroupReduceFunction<Long, Long>() { + GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() { public void reduce(Iterator<Long> values, Collector<Long> out) {} }).name("reducer"); @@ -147,7 +147,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java data .groupBy(1) - .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer") .print().name("sink"); @@ -194,9 +194,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); - ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data + GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data .groupBy(1) - .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer"); @@ -255,7 +255,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java .groupBy(new KeySelector<Tuple2<String,Double>, String>() { public String getKey(Tuple2<String, Double> value) { return value.f0; } }) - .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer") .print().name("sink"); @@ -309,11 +309,11 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) .name("source").setParallelism(6); - ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data + GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data .groupBy(new KeySelector<Tuple2<String,Double>, String>() { public String getKey(Tuple2<String, Double> value) { return value.f0; } }) - .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { + .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} }).name("reducer"); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java index d3c3e3f..8fc4324 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.compiler; import static org.junit.Assert.*; +import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.junit.Test; import java.util.Iterator; @@ -31,10 +32,9 @@ import org.apache.flink.api.java.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.IterativeDataSet; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -261,7 +261,7 @@ public class IterationsCompilerTest extends CompilerTestBase { } - public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { @@ -269,13 +269,13 @@ public class IterationsCompilerTest extends CompilerTestBase { } } - public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { + public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> { @Override public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {} } - public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception { @@ -284,14 +284,14 @@ public class IterationsCompilerTest extends CompilerTestBase { } @ConstantFields("0") - public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> { + public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> { @Override public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {} } @ConstantFields("0") - public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> { + public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception { @@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase { } } - public static final class DuplicateValueScalar<T> extends MapFunction<T, Tuple2<T, T>> { + public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> { @Override public Tuple2<T, T> map(T value) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java index a654872..fb8ae8d 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java @@ -22,7 +22,7 @@ package org.apache.flink.compiler; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.junit.Test; import org.apache.flink.api.java.DataSet; @@ -46,7 +46,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source"); - data.reduce(new ReduceFunction<Double>() { + data.reduce(new RichReduceFunction<Double>() { @Override public Double reduce(Double value1, Double value2){ @@ -91,7 +91,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); - data.reduce(new ReduceFunction<Long>() { + data.reduce(new RichReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2){ @@ -145,7 +145,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S data .groupBy(1) - .reduce(new ReduceFunction<Tuple2<String,Double>>() { + .reduce(new RichReduceFunction<Tuple2<String,Double>>() { @Override public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){ return null; @@ -205,7 +205,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S .groupBy(new KeySelector<Tuple2<String,Double>, String>() { public String getKey(Tuple2<String, Double> value) { return value.f0; } }) - .reduce(new ReduceFunction<Tuple2<String,Double>>() { + .reduce(new RichReduceFunction<Tuple2<String,Double>>() { @Override public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){ return null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java index 1f653c0..1020c8b 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.base.FlatMapOperatorBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FlatMapFunction; +import org.apache.flink.api.java.functions.RichFlatMapFunction; import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.ReduceOperator; @@ -174,7 +174,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { }); } - public static final class DummyFlatMap extends FlatMapFunction<String, Tuple2<String, Integer>> { + public static final class DummyFlatMap extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java index e04256c..64a4791 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java @@ -31,9 +31,9 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.JoinFunction; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichJoinFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.compiler.plan.DualInputPlanNode; import org.apache.flink.compiler.plan.OptimizedPlan; @@ -214,7 +214,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { iter.getWorkset().join(invariantInput) .where(1, 2) .equalTo(1, 2) - .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { + .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return first; } @@ -224,7 +224,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { result.join(iter.getSolutionSet()) .where(1, 0) .equalTo(0, 2) - .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { + .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return second; } @@ -263,7 +263,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { iter.getWorkset().join(invariantInput) .where(1, 2) .equalTo(1, 2) - .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { + .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return first; } @@ -273,7 +273,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { .join(iter.getSolutionSet()) .where(1, 0) .equalTo(1, 2) - .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { + .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) { return second; } @@ -282,7 +282,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { .withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null); DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2) - .reduceGroup(new GroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() { + .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() { public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {} }) .name(NEXT_WORKSET_REDUCER_NAME) @@ -290,7 +290,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ? - joinedWithSolutionSet.map(new MapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } }) + joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } }) .name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") : joinedWithSolutionSet; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java new file mode 100644 index 0000000..2388db4 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.compiler.testfunctions; + +import org.apache.flink.api.java.functions.RichFlatJoinFunction; +import org.apache.flink.util.Collector; + +public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> { + + private static final long serialVersionUID = 1L; + + @Override + public void join(T first, T second, Collector<T> out) { + out.collect(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java deleted file mode 100644 index 0db075f..0000000 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.compiler.testfunctions; - -import org.apache.flink.api.java.functions.JoinFunction; - -public class DummyJoinFunction<T> extends JoinFunction<T, T, T> { - - private static final long serialVersionUID = 1L; - - @Override - public T join(T first, T second) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java index fe61f25..42275af 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java @@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions; import java.util.Iterator; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; @Combinable -public class IdentityGroupReducer<T> extends GroupReduceFunction<T, T> { +public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java index b6aa40b..29fc2c8 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java @@ -19,9 +19,9 @@ package org.apache.flink.compiler.testfunctions; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; -public class IdentityMapper<T> extends MapFunction<T, T> { +public class IdentityMapper<T> extends RichMapFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java index 91634cc..7ce267f 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java @@ -18,9 +18,9 @@ package org.apache.flink.compiler.testfunctions; -import org.apache.flink.api.java.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; -public class SelectOneReducer<T> extends ReduceFunction<T> { +public class SelectOneReducer<T> extends RichReduceFunction<T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java index 26db00e..3f24e65 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java @@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions; import java.util.Iterator; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; @Combinable -public class Top1GroupReducer<T> extends GroupReduceFunction<T, T> { +public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java index 51ad75d..736ee14 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java @@ -23,14 +23,13 @@ import java.io.Serializable; import org.apache.flink.api.java.record.functions.CrossFunction; import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -public class DummyCrossStub extends CrossFunction implements Serializable { +public class DummyCrossStub extends CrossFunction { private static final long serialVersionUID = 1L; + @Override - public void cross(Record record1, Record record2, Collector<Record> out) { - out.collect(record1); - out.collect(record2); + public Record cross(Record first, Record second) throws Exception { + return first; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java deleted file mode 100644 index f4b2763..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.common.functions; - -import java.io.Serializable; - -import org.apache.flink.configuration.Configuration; - -/** - * An abstract stub implementation for user-defined functions. It offers default implementations - * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the - * {@link RuntimeContext} and {@link IterationRuntimeContext}. - */ -public abstract class AbstractFunction implements Function, Serializable { - - private static final long serialVersionUID = 1L; - - // -------------------------------------------------------------------------------------------- - // Runtime context access - // -------------------------------------------------------------------------------------------- - - private transient RuntimeContext runtimeContext; - - public void setRuntimeContext(RuntimeContext t) { - if (this.runtimeContext == null) { - this.runtimeContext = t; - } else { - throw new IllegalStateException("Error: The runtime context has already been set."); - } - } - - public RuntimeContext getRuntimeContext() { - if (this.runtimeContext != null) { - return this.runtimeContext; - } else { - throw new IllegalStateException("The runtime context has not been initialized."); - } - } - - public IterationRuntimeContext getIterationRuntimeContext() { - if (this.runtimeContext == null) { - throw new IllegalStateException("The runtime context has not been initialized."); - } else if (this.runtimeContext instanceof IterationRuntimeContext) { - return (IterationRuntimeContext) this.runtimeContext; - } else { - throw new IllegalStateException("This stub is not part of an iteration step function."); - } - } - - // -------------------------------------------------------------------------------------------- - // Default life cycle methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(Configuration parameters) throws Exception {} - - @Override - public void close() throws Exception {} -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java new file mode 100644 index 0000000..07b957d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +import org.apache.flink.configuration.Configuration; + +/** + * An abstract stub implementation for user-defined functions. It offers default implementations + * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the + * {@link RuntimeContext} and {@link IterationRuntimeContext}. + */ +public abstract class AbstractRichFunction implements RichFunction, Serializable { + + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // Runtime context access + // -------------------------------------------------------------------------------------------- + + private transient RuntimeContext runtimeContext; + + public void setRuntimeContext(RuntimeContext t) { + if (this.runtimeContext == null) { + this.runtimeContext = t; + } else { + throw new IllegalStateException("Error: The runtime context has already been set."); + } + } + + public RuntimeContext getRuntimeContext() { + if (this.runtimeContext != null) { + return this.runtimeContext; + } else { + throw new IllegalStateException("The runtime context has not been initialized."); + } + } + + public IterationRuntimeContext getIterationRuntimeContext() { + if (this.runtimeContext == null) { + throw new IllegalStateException("The runtime context has not been initialized."); + } else if (this.runtimeContext instanceof IterationRuntimeContext) { + return (IterationRuntimeContext) this.runtimeContext; + } else { + throw new IllegalStateException("This stub is not part of an iteration step function."); + } + } + + // -------------------------------------------------------------------------------------------- + // Default life cycle methods + // -------------------------------------------------------------------------------------------- + + @Override + public void open(Configuration parameters) throws Exception {} + + @Override + public void close() throws Exception {} +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java new file mode 100644 index 0000000..5c200af --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; +import java.util.Iterator; + +import org.apache.flink.util.Collector; + + +public interface CoGroupFunction<V1, V2, O> extends Function, Serializable { + + /** + * This method must be implemented to provide a user implementation of a + * coGroup. It is called for each two key-value pairs that share the same + * key and come from different inputs. + * + * @param first The records from the first input which were paired with the key. + * @param second The records from the second input which were paired with the key. + * @param out A collector that collects all output pairs. + */ + void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java new file mode 100644 index 0000000..d72c4c8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * Generic interface used for combiners. + */ +public interface CombineFunction<T> extends Function, Serializable { + + T combine(Iterator<T> records) throws Exception; +}
