http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java index 2293b5e..c045508 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java @@ -20,22 +20,14 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** - * The abstract base class for flatMap functions. FlatMap functions take elements and transform them, - * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists - * and arrays. Operations that produce multiple strictly one result element per input element can also - * use the {@link RichMapFunction}. - * <p> - * The basic syntax for using a FlatMapFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<Y> result = input.flatMap(new MyFlatMapFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the FlatMapFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link FlatMapFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. @@ -44,16 +36,6 @@ public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction private static final long serialVersionUID = 1L; - /** - * The core method of the FlatMapFunction. Takes an element from the input data set and transforms - * it into zero, one, or more elements. - * - * @param value The input value. - * @param out The collector for for emitting result values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract void flatMap(IN value, Collector<OUT> out) throws Exception; }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java index eb75f53..801f592 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java @@ -27,26 +27,14 @@ import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.util.Collector; /** - * The abstract base class for group reduce functions. Group reduce functions process groups of elements. - * They may aggregate them to a single value, or produce multiple result values for each group. - * <p> - * For a reduce functions that works incrementally by combining always two elements, see - * {@link RichReduceFunction}, called via {@link org.apache.flink.api.java.DataSet#reduce(RichReduceFunction)}. - * <p> - * The basic syntax for using a grouped GroupReduceFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction()); - * </blockquote></pre> - * <p> - * GroupReduceFunctions may be "combinable", in which case they can pre-reduce partial groups in order to - * reduce the data volume early. See the {@link #combine(Iterator, Collector)} function for details. - * <p> - * Like all functions, the GroupReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link GroupReduceFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN> Type of the elements that this function processes. * @param <OUT> The type of the elements returned by the user-defined function. @@ -55,16 +43,6 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct private static final long serialVersionUID = 1L; - /** - * Core method of the reduce function. It is called one per group of elements. If the reducer - * is not grouped, than the entire data set is considered one group. - * - * @param values The iterator returning the group of values to be reduced. - * @param out The collector to emit the returned values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract void reduce(Iterator<IN> values, Collector<OUT> out) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java index 7eaf44c..a0c28ee 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java @@ -18,10 +18,20 @@ package org.apache.flink.api.java.functions; - import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.RichFunction; +/** + * Rich variant of the {@link JoinFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param <IN1> The type of the elements in the first input. + * @param <IN2> The type of the elements in the second input. + * @param <OUT> The type of the result elements. + */ public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java index 54de7d4..f6f5356 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java @@ -20,22 +20,13 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFunction; /** - * The abstract base class for Map functions. Map functions take elements and transform them, - * element wise. A Map function always produces a single result element for each input element. - * Typical applications are parsing elements, converting data types, or projecting out fields. - * Operations that produce multiple result elements from a single input element can be implemented - * using the {@link RichFlatMapFunction}. - * <p> - * The basic syntax for using a MapFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<Y> result = input.map(new MyMapFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the MapFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link MapFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. @@ -44,16 +35,6 @@ public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction impl private static final long serialVersionUID = 1L; - /** - * The core method of the MapFunction. Takes an element from the input data set and transforms - * it into another element. - * - * @param value The input value. - * @return The value produced by the map function from the input value. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ @Override public abstract OUT map(IN value) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java index 35cb392..a63f8dc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java @@ -20,27 +20,13 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; /** - * The abstract base class for Reduce functions. Reduce functions combine groups of elements to - * a single value, by taking always two elements and combining them into one. Reduce functions - * may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced - * individually. - * <p> - * For a reduce functions that work on an entire group at the same time (such as the - * MapReduce/Hadoop-style reduce), see {@link RichGroupReduceFunction}, called via - * {@link org.apache.flink.api.java.DataSet#reduceGroup(RichGroupReduceFunction)}. In the general case, - * ReduceFunctions are considered faster, because they allow the system to use hash-based - * execution strategies. - * <p> - * The basic syntax for using a grouped ReduceFunction is as follows: - * <pre><blockquote> - * DataSet<X> input = ...; - * - * DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction()); - * </blockquote></pre> - * <p> - * Like all functions, the ReduceFunction needs to be serializable, as defined in {@link java.io.Serializable}. + * Rich variant of the {@link ReduceFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. * * @param <T> Type of the elements that this function processes. */ @@ -48,16 +34,5 @@ public abstract class RichReduceFunction<T> extends AbstractRichFunction impleme private static final long serialVersionUID = 1L; - /** - * The core method of the ReduceFunction, combining two values into one value of the same type. - * The reduce function is consecutively applied to all values of a group until only a single value remains. - * - * @param value1 The first value to combine. - * @param value2 The second value to combine. - * @return The combined value of both input values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ public abstract T reduce(T value1, T value2) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index 89c3334..744893b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -515,7 +515,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU if (function == null) { throw new NullPointerException("CoGroup function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(function)) { + if (FunctionUtils.isLambdaFunction(function)) { throw new UnsupportedLambdaExpressionException(); } TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java index d1e99d6..a24a093 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java @@ -134,7 +134,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, if (function == null) { throw new NullPointerException("Cross function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(function)) { + if (FunctionUtils.isLambdaFunction(function)) { throw new UnsupportedLambdaExpressionException(); } TypeInformation<R> returnType = TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index 591551f..7646fa0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.operators; import java.util.Iterator; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; @@ -29,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; @@ -80,7 +80,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) { final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>(); - final FlatCombineFunction<T> combineFunction = new DistinctCombiner<T>(); String name = function.getClass().getName(); @@ -104,7 +103,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct( - selectorKeys, function, combineFunction, getInputType(), getResultType(), name, input, true); + selectorKeys, function, getInputType(), getResultType(), name, input); po.setDegreeOfParallelism(this.getParallelism()); @@ -118,9 +117,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera // -------------------------------------------------------------------------------------------- private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct( - Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, FlatCombineFunction<IN> combineFunction, - TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input, - boolean combinable) + Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, + TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) { @SuppressWarnings("unchecked") final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys; @@ -131,7 +129,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = - new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable); + new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true); MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor"); @@ -144,26 +142,14 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera return reducer; } + @Combinable public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterator<T> values, Collector<T> out) - throws Exception { + public void reduce(Iterator<T> values, Collector<T> out) { out.collect(values.next()); } } - - public static final class DistinctCombiner<T> implements FlatCombineFunction<T> { - - private static final long serialVersionUID = 1L; - - @Override - public void combine(Iterator<T> values, Collector<T> out) - throws Exception { - out.collect(values.next()); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index 7ab0b11..e1424ad 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -51,9 +51,6 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT private final Grouping<IN> grouper; - // reduceFunction is a GroupReduceFunction - private boolean richFunction; - private boolean combinable; /** @@ -176,8 +173,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = - new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); + GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po = + new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); po.setCombinable(combinable); po.setInput(input); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java index 3223f4d..200c4de 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java @@ -26,8 +26,8 @@ import org.apache.flink.api.java.DataSet; * Grouping is an intermediate step for a transformation on a grouped DataSet.<br/> * The following transformation can be applied on Grouping: * <ul> - * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li> - * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}, and</li> + * <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li> + * <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}, and</li> * <li>{@link UnsortedGrouping#aggregate(org.apache.flink.api.java.aggregation.Aggregations, int)}.</li> * </ul> * http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index a07a157..ce0aea7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -467,7 +467,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, if (function == null) { throw new NullPointerException("Join function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(function)) { + if (FunctionUtils.isLambdaFunction(function)) { throw new UnsupportedLambdaExpressionException(); } TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type()); @@ -478,10 +478,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, if (function == null) { throw new NullPointerException("Join function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(function)) { + if (FunctionUtils.isLambdaFunction(function)) { throw new UnsupportedLambdaExpressionException(); } - FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function); + FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function); TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type()); return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 97b2417..767f75a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -32,7 +32,7 @@ import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/> * The following transformation can be applied on sorted groups: * <ul> - * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li> + * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)},</li> * </ul> * * @param <T> The type of the elements of the sorted and grouped DataSet. @@ -82,7 +82,7 @@ public class SortedGrouping<T> extends Grouping<T> { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } return new GroupReduceOperator<T, R>(this, reducer); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 9e71ba0..87b1454 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -127,7 +127,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + if (FunctionUtils.isLambdaFunction(reducer)) { throw new UnsupportedLambdaExpressionException(); } return new GroupReduceOperator<T, R>(this, reducer); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 5e80455..29eb5ed 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -39,7 +39,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), + super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); super.setCombinable(combinable); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java index fa0ca11..e9b5c25 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java @@ -26,7 +26,7 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; /** - * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}. + * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}. */ public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java index c4587fd..3a6c931 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.types.Record; /** - * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}. + * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CrossOperator}. */ public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> { @@ -41,10 +41,6 @@ public abstract class CrossFunction extends AbstractRichFunction implements org. * runtime catches an exception, it aborts the task and lets the fail-over logic * decide whether to retry the task execution. */ - - //@Override - //public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception; - @Override public abstract Record cross(Record first, Record second) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java index dce24a3..cc4f96b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.java.record.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; @@ -25,25 +24,13 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; /** - * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}. + * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.record.operators.JoinOperator}. * It resembles an equality join of both inputs on their key fields. */ public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; - /** - * This method must be implemented to provide a user implementation of a join. - * It is called for each two records that share the same key and come from different inputs. - * - * @param value1 The record that comes from the first input. - * @param value2 The record that comes from the second input. - * @return The result of the join UDF as record - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ @Override public abstract void join(Record value1, Record value2, Collector<Record> out) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java index 99c945d..b082e2d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.java.record.functions; import org.apache.flink.api.common.functions.AbstractRichFunction; @@ -28,6 +27,7 @@ import org.apache.flink.util.Collector; * The MapFunction must be extended to provide a mapper implementation * By definition, the mapper is called for each individual input record. */ +@SuppressWarnings("deprecation") public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java index 073b11a..a1e6369 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java @@ -29,7 +29,7 @@ import org.apache.flink.util.Collector; /** * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a - * {@link org.apache.flink.api.java.operators.ReduceOperator}. + * {@link org.apache.flink.api.java.record.operators.ReduceOperator}. */ public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java index 64f70f6..85afa64 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapOperator.java @@ -41,6 +41,7 @@ import org.apache.flink.types.Record; * * @see MapFunction */ +@SuppressWarnings("deprecation") public class MapOperator extends CollectorMapOperatorBase<Record, Record, MapFunction> implements RecordOperator { private static String DEFAULT_NAME = "<Unnamed Mapper>"; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java index 155bbd1..be872e5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java @@ -129,7 +129,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable { assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass()); assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass()); if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) { - WrappingFunction wf = (WrappingFunction) solutionSetJoin.getUserCodeWrapper().getUserCodeObject(); + WrappingFunction<?> wf = (WrappingFunction<?>) solutionSetJoin.getUserCodeWrapper().getUserCodeObject(); assertEquals(SolutionWorksetJoin.class, wf.getWrappedFunction().getClass()); } else { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java new file mode 100644 index 0000000..ec3898e --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators.translation; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.DistinctOperator; +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings("serial") +public class DistrinctTranslationTest { + + @Test + public void testCombinable() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> input = env.fromElements("1", "2", "1", "3"); + + + DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() { + public String getKey(String value) { return value; } + }); + + op.print(); + + Plan p = env.createProgramPlan(); + + GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput(); + Assert.assertTrue(reduceOp.isCombinable()); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index c6ad73d..8346d00 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -1385,6 +1385,8 @@ public class TypeExtractorTest { public void testFunction() { RichMapFunction<String, Boolean> mapInterface = new RichMapFunction<String, Boolean>() { + private static final long serialVersionUID = 1L; + @Override public void setRuntimeContext(RuntimeContext t) { @@ -1417,6 +1419,8 @@ public class TypeExtractorTest { @Test public void testInterface() { MapFunction<String, Boolean> mapInterface = new MapFunction<String, Boolean>() { + private static final long serialVersionUID = 1L; + @Override public Boolean map(String record) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java index c417249..7dd4dea 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class CoGroupITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java index f8d217e..3875bab 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class CrossITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java index c775425..bb04336 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java @@ -23,8 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.JavaProgramTestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.FileNotFoundException; import java.io.IOException; @@ -34,9 +32,11 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -@RunWith(Parameterized.class) +@SuppressWarnings("serial") public class FilterITCase extends JavaProgramTestBase { + private static final String EXPECTED_RESULT = "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n"; public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) { @@ -68,15 +68,7 @@ public class FilterITCase extends JavaProgramTestBase { return env.fromCollection(data); } - private static int NUM_PROGRAMS = 1; - - private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; - private String expectedResult; - - public FilterITCase(Configuration config) { - super(config); - } @Override protected void preSubmit() throws Exception { @@ -85,58 +77,18 @@ public class FilterITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - expectedResult = FilterProgs.runProgram(curProgId, resultPath); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> filterDs = ds. + filter(value -> value.f2.contains("world")); + filterDs.writeAsCsv(resultPath); + env.execute(); } @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); + compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); } - - private static class FilterProgs { - - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Test lambda filter - * Functionality identical to org.apache.flink.test.javaApiOperators.FilterITCase test 3 - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<Integer, Long, String>> ds = get3TupleDataSet(env); - DataSet<Tuple3<Integer, Long, String>> filterDs = ds. - filter(value -> value.f2.contains("world")); - filterDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - - } - - } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java index 043b4e8..431151e 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class FlatJoinITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java index 55f507c..5cf7fc2 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java @@ -26,6 +26,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class FlatMapITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java index 494aff6..a86de1f 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class GroupReduceITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java index 3f4f696..d44d116 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class JoinITCase implements Serializable { @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java new file mode 100644 index 0000000..4c8ee23 --- /dev/null +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/LambdaExtractionTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators.lambdas; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.java.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Test; + +@SuppressWarnings("serial") +public class LambdaExtractionTest { + + @Test + public void testIdentifyLambdas() { + try { + MapFunction<?, ?> anonymousFromInterface = new MapFunction<String, Integer>() { + @Override + public Integer map(String value) { return Integer.parseInt(value); } + }; + + MapFunction<?, ?> anonymousFromClass = new RichMapFunction<String, Integer>() { + @Override + public Integer map(String value) { return Integer.parseInt(value); } + }; + + MapFunction<?, ?> fromProperClass = new StaticMapper(); + + MapFunction<?, ?> fromDerived = new ToTuple<Integer>() { + @Override + public Tuple2<Integer, Long> map(Integer value) { + return new Tuple2<Integer, Long>(value, 1L); + } + }; + + MapFunction<String, Integer> lambda = (str) -> Integer.parseInt(str); + + assertFalse(FunctionUtils.isLambdaFunction(anonymousFromInterface)); + assertFalse(FunctionUtils.isLambdaFunction(anonymousFromClass)); + assertFalse(FunctionUtils.isLambdaFunction(fromProperClass)); + assertFalse(FunctionUtils.isLambdaFunction(fromDerived)); + assertTrue(FunctionUtils.isLambdaFunction(lambda)); + assertTrue(FunctionUtils.isLambdaFunction(STATIC_LAMBDA)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + public static class StaticMapper implements MapFunction<String, Integer> { + + @Override + public Integer map(String value) { return Integer.parseInt(value); } + } + + public interface ToTuple<T> extends MapFunction<T, Tuple2<T, Long>> { + + @Override + public Tuple2<T, Long> map(T value) throws Exception; + } + + private static final MapFunction<String, Integer> STATIC_LAMBDA = (str) -> Integer.parseInt(str); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java index 3af360b..5e9f732 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java @@ -26,6 +26,7 @@ import org.junit.Test; import java.io.Serializable; +@SuppressWarnings("serial") public class MapITCase implements Serializable{ @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java index ab27fe4..1a34814 100644 --- a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java +++ b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java @@ -36,9 +36,20 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -@RunWith(Parameterized.class) +@SuppressWarnings("serial") public class ReduceITCase extends JavaProgramTestBase { + private static final String EXPECTED_RESULT = "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) { List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<Tuple5<Integer, Long, Integer, String, Long>>(); @@ -71,17 +82,9 @@ public class ReduceITCase extends JavaProgramTestBase { return env.fromCollection(data, type); } - - private static int NUM_PROGRAMS = 1; - - private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; - private String expectedResult; - - public ReduceITCase(Configuration config) { - super(config); - } - + @Override protected void preSubmit() throws Exception { resultPath = getTempDirPath("result"); @@ -89,72 +92,23 @@ public class ReduceITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - expectedResult = ReduceProgs.runProgram(curProgId, resultPath); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds + .groupBy(4, 0) + .reduce((in1, in2) -> { + Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); + out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); + return out; + }); + + reduceDs.writeAsCsv(resultPath); + env.execute(); } @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Parameterized.Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - private static class ReduceProgs { - - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Test reduce with lambda - * Functionality identical to org.apache.flink.test.javaApiOperators.ReduceITCase test 2 - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = get5TupleDataSet(env); - DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds - .groupBy(4, 0) - .reduce((in1, in2) -> { - Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>(); - out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4); - return out; - }); - - reduceDs.writeAsCsv(resultPath); - env.execute(); - - // return expected result - return "1,1,0,Hallo,1\n" + - "2,3,2,Hallo Welt wie,1\n" + - "2,2,1,Hallo Welt,2\n" + - "3,9,0,P-),2\n" + - "3,6,5,BCD,3\n" + - "4,17,0,P-),1\n" + - "4,17,0,P-),2\n" + - "5,11,10,GHI,1\n" + - "5,29,0,P-),2\n" + - "5,25,0,P-),3\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - - } - + compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath); } - - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java index 1db3524..9ff7181 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.iterative.io; import java.io.IOException; @@ -28,13 +27,12 @@ import org.apache.flink.util.Collector; /** * A {@link Collector} to update the solution set of a workset iteration. * <p/> - * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates. + * The records are written to a HashTable hash table to allow in-memory point updates. * <p/> * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This * is for example the case when a solution set update happens directly after a solution set join. If this assumption * doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating. - * - * @see {SolutionSetUpdateOutputCollector} + */ public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java index 17670f1..89789c35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java @@ -28,7 +28,7 @@ import org.apache.flink.util.Collector; /** * A {@link Collector} to update the solution set of a workset iteration. * <p/> - * The records are written to a {@link MutableHashTable} hash table to allow in-memory point updates. + * The records are written to a HashTable hash table to allow in-memory point updates. * <p/> * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 34cd232..636c492 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.aggregators.LongSumAggregator; -import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -54,7 +54,7 @@ import java.io.IOException; /** * The base class for all tasks able to participate in an iteration. */ -public abstract class AbstractIterativePactTask<S extends RichFunction, OT> extends RegularPactTask<S, OT> +public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT> implements Terminable { private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java index 7a77cff..797bbb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.iterative.task; import java.io.IOException; @@ -25,7 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -57,7 +56,7 @@ import org.apache.flink.util.MutableObjectIterator; * The head is responsible for coordinating an iteration and can run a * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing - * the input, it will send {@link EndOfSuperstepEvent} events to its outputs. It must also be connected to a + * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a * synchronization task and after each superstep, it will wait * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished * their iteration. Starting with @@ -75,7 +74,7 @@ import org.apache.flink.util.MutableObjectIterator; * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the * same as {@code X} */ -public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> { +public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> { private static final Log log = LogFactory.getLog(IterationHeadPactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java index 2a8325c..c23eae1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java @@ -16,14 +16,13 @@ * limitations under the License. */ - package org.apache.flink.runtime.iterative.task; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.runtime.io.network.api.BufferWriter; import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent; import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel; @@ -32,16 +31,16 @@ import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; import org.apache.flink.util.Collector; /** - * An intermediate iteration task, which runs a {@link PactDriver} inside. + * An intermediate iteration task, which runs a Driver}inside. * <p/> * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore * intermediate tasks can also update the iteration state, either the workset or the solution set. * <p/> * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via - * a {@link BlockingBackChannel} for the workset -XOR- a {@link MutableHashTable} for the solution set. In this case + * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case * this task must be scheduled on the same instance as the head. */ -public class IterationIntermediatePactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> { +public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> { private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index a06ef5d..c44f443 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -44,9 +44,9 @@ import org.apache.flink.types.Value; import com.google.common.base.Preconditions; /** - * The task responsible for synchronizing all iteration heads, implemented as an {@link AbstractOutputTask}. This task + * The task responsible for synchronizing all iteration heads, implemented as an output task. This task * will never see any data. - * In each superstep, it simply waits until it has receiced a {@link WorkerDoneEvent} from each head and will send back + * In each superstep, it simply waits until it has received a {@link WorkerDoneEvent} from each head and will send back * an {@link AllWorkersDoneEvent} to signal that the next superstep can begin. */ public class IterationSynchronizationSinkTask extends AbstractInvokable implements Terminable { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java index 942e2f6..90d732c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java @@ -16,12 +16,11 @@ * limitations under the License. */ - package org.apache.flink.runtime.iterative.task; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier; import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker; import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector; @@ -29,16 +28,16 @@ import org.apache.flink.runtime.operators.PactTaskContext; import org.apache.flink.util.Collector; /** - * An iteration tail, which runs a {@link PactDriver} inside. + * An iteration tail, which runs a driver inside. * <p/> * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via - * a {@link BlockingBackChannel} for the workset -OR- a {@link MutableHashTable} for the solution set. Therefore this + * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset * and the solution set. * <p/> * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish. */ -public class IterationTailPactTask<S extends RichFunction, OT> extends AbstractIterativePactTask<S, OT> +public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> implements PactTaskContext<S, OT> { private static final Log log = LogFactory.getLog(IterationTailPactTask.class); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java deleted file mode 100644 index 78cf1f5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RuntimeExecutionContext.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.operators; - -import org.apache.flink.api.common.functions.ExecutionContext; -import org.apache.flink.runtime.execution.Environment; - - -/** - * Default implementation of the {@link ExecutionContext} that delegates the calls to the nephele task - * environment. - * - */ -public class RuntimeExecutionContext implements ExecutionContext -{ - private final Environment env; - - public RuntimeExecutionContext(Environment env) { - this.env = env; - } - - - @Override - public String getTaskName() { - return this.env.getTaskName(); - } - - - @Override - public int getNumberOfSubtasks() { - return this.env.getCurrentNumberOfSubtasks(); - } - - - @Override - public int getSubtaskIndex() { - return this.env.getIndexInSubtaskGroup() + 1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java index aaad08c..99a59b1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java @@ -16,11 +16,8 @@ * limitations under the License. */ - package org.apache.flink.test.operators; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.record.functions.CrossFunction; import org.apache.flink.api.java.record.io.DelimitedInputFormat; @@ -35,8 +32,6 @@ import org.apache.flink.test.util.RecordAPITestBase; import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -47,17 +42,12 @@ import java.io.Serializable; import java.util.Collection; import java.util.LinkedList; -/** - */ @RunWith(Parameterized.class) -//@Ignore("Test needs to be adapted to new cross signature") public class CrossITCase extends RecordAPITestBase { - private static final Log LOG = LogFactory.getLog(CrossITCase.class); - - String leftInPath = null; - String rightInPath = null; - String resultPath = null; + private String leftInPath = null; + private String rightInPath = null; + private String resultPath = null; public CrossITCase(Configuration testConfig) { super(testConfig); @@ -112,8 +102,6 @@ public class CrossITCase extends RecordAPITestBase { int key1 = Integer.parseInt(string.toString()); string = record2.getField(0, string); int key2 = Integer.parseInt(string.toString()); - - LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }"); string.setValue((key1 + key2 + 2) + ""); integer.setValue(val2 - val1 + 1);
