http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index 5ca1068..a07a157 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -22,8 +22,10 @@ import java.security.InvalidParameterException; import java.util.Arrays; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.GenericJoiner; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Operator; @@ -32,19 +34,22 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder; -import org.apache.flink.api.java.functions.JoinFunction; +import org.apache.flink.api.java.functions.RichFlatJoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; import org.apache.flink.api.java.operators.Keys.FieldPositionKeys; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingJoinOperator; import org.apache.flink.api.java.operators.translation.TupleKeyExtractingMapper; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.types.TypeInformation; //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import org.apache.flink.api.java.tuple.*; +import org.apache.flink.util.Collector; //CHECKSTYLE.ON: AvoidStarImport /** @@ -147,12 +152,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @param <I2> The type of the second input DataSet of the Join transformation. * @param <OUT> The type of the result of the Join transformation. * - * @see JoinFunction + * @see org.apache.flink.api.java.functions.RichFlatJoinFunction * @see DataSet */ public static class EquiJoin<I1, I2, OUT> extends JoinOperator<I1, I2, OUT> { - private final JoinFunction<I1, I2, OUT> function; + private final FlatJoinFunction<I1, I2, OUT> function; @SuppressWarnings("unused") private boolean preserve1; @@ -160,7 +165,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, private boolean preserve2; protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2, - Keys<I1> keys1, Keys<I2> keys2, JoinFunction<I1, I2, OUT> function, + Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function, TypeInformation<OUT> returnType, JoinHint hint) { super(input1, input2, keys1, keys2, returnType, hint); @@ -171,14 +176,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, this.function = function; - if (!(function instanceof ProjectJoinFunction)) { + if (!(function instanceof ProjectFlatJoinFunction)) { extractSemanticAnnotationsFromUdf(function.getClass()); } else { - generateProjectionProperties(((ProjectJoinFunction<?, ?, ?>) function)); + generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) function)); } } - public void generateProjectionProperties(ProjectJoinFunction<?, ?, ?> pjf) { + protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2, + Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function, + TypeInformation<OUT> returnType, JoinHint hint) + { + super(input1, input2, keys1, keys2, returnType, hint); + + if (function == null) { + throw new NullPointerException(); + } + + this.function = generatedFunction; + + if (!(generatedFunction instanceof ProjectFlatJoinFunction)) { + extractSemanticAnnotationsFromUdf(function.getClass()); + } else { + generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) generatedFunction)); + } + } + + public void generateProjectionProperties(ProjectFlatJoinFunction<?, ?, ?> pjf) { DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst()); setSemanticProperties(props); } @@ -238,8 +262,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, int[] logicalKeyPositions1 = super.keys1.computeLogicalKeyPositions(); int[] logicalKeyPositions2 = super.keys2.computeLogicalKeyPositions(); - JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>> po = - new JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>>(function, + JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>> po = + new JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()), logicalKeyPositions1, logicalKeyPositions2, name); @@ -298,7 +322,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoin( Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2, - JoinFunction<I1, I2, OUT> function, + FlatJoinFunction<I1, I2, OUT> function, TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name, Operator<I1> input1, Operator<I2> input2) { @@ -313,10 +337,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor()); final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor()); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); join.setFirstInput(keyMapper1); @@ -333,7 +357,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinRight( int[] logicalKeyPositions1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2, - JoinFunction<I1, I2, OUT> function, + FlatJoinFunction<I1, I2, OUT> function, TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name, Operator<I1> input1, Operator<I2> input2) { @@ -350,10 +374,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, final TupleKeyExtractingMapper<I1, K> extractor1 = new TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]); final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor()); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); @@ -371,7 +395,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinLeft( Keys.SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2, - JoinFunction<I1, I2, OUT> function, + FlatJoinFunction<I1, I2, OUT> function, TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name, Operator<I1> input1, Operator<I2> input2) { @@ -388,10 +412,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor()); final TupleKeyExtractingMapper<I2, K> extractor2 = new TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]); - final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 = - new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); - final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 = - new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); + final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1"); + final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 = + new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2"); final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2); @@ -424,28 +448,73 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint) { super(input1, input2, keys1, keys2, - (JoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultJoinFunction<I1, I2>(), + (RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(), new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint); } /** - * Finalizes a Join transformation by applying a {@link JoinFunction} to each pair of joined elements.<br/> + * Finalizes a Join transformation by applying a {@link org.apache.flink.api.java.functions.RichFlatJoinFunction} to each pair of joined elements.<br/> * Each JoinFunction call returns exactly one element. * * @param function The JoinFunction that is called for each pair of joined elements. * @return An EquiJoin that represents the joined result DataSet * - * @see JoinFunction + * @see org.apache.flink.api.java.functions.RichFlatJoinFunction * @see org.apache.flink.api.java.operators.JoinOperator.EquiJoin * @see DataSet */ - public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) { + public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> function) { if (function == null) { throw new NullPointerException("Join function must not be null."); } + if (FunctionUtils.isSerializedLambdaFunction(function)) { + throw new UnsupportedLambdaExpressionException(); + } TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type()); return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint()); } + + public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) { + if (function == null) { + throw new NullPointerException("Join function must not be null."); + } + if (FunctionUtils.isSerializedLambdaFunction(function)) { + throw new UnsupportedLambdaExpressionException(); + } + FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function); + TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type()); + return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint()); + } + + private static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; + + private WrappingFlatJoinFunction(JoinFunction<IN1, IN2, OUT> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void join(IN1 left, IN2 right, Collector<OUT> out) throws Exception { + out.collect (this.wrappedFunction.join(left, right)); + } + } + + /* + private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> extends FlatJoinFunction<IN1, IN2, OUT> { + + private Joinable<IN1,IN2,OUT> function; + + private GeneratedFlatJoinFunction(Joinable<IN1, IN2, OUT> function) { + this.function = function; + } + + @Override + public void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception { + out.collect(function.join(first, second)); + } + } + */ /** * Initiates a ProjectJoin transformation and projects the first join input<br/> @@ -530,7 +599,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) { super(input1, input2, keys1, keys2, - new ProjectJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), + new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType, hint); } @@ -821,20 +890,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, // default join functions // -------------------------------------------------------------------------------------------- - public static final class DefaultJoinFunction<T1, T2> extends JoinFunction<T1, T2, Tuple2<T1, T2>> { + public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> { private static final long serialVersionUID = 1L; private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>(); @Override - public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception { + public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> out) throws Exception { outTuple.f0 = first; outTuple.f1 = second; - return outTuple; + out.collect(outTuple); } } - public static final class ProjectJoinFunction<T1, T2, R extends Tuple> extends JoinFunction<T1, T2, R> { + public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> { private static final long serialVersionUID = 1L; @@ -851,7 +920,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, * @param isFromFirst List of flags indicating whether the field should be copied from the first (true) or the second (false) input. * @param outTupleInstance An instance of an output tuple. */ - private ProjectJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) { + private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) { if(fields.length != isFromFirst.length) { throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!"); @@ -869,7 +938,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, return isFromFirst; } - public R join(T1 in1, T2 in2) { + public void join(T1 in1, T2 in2, Collector<R> out) { for(int i=0; i<fields.length; i++) { if(isFromFirst[i]) { if(fields[i] >= 0) { @@ -885,27 +954,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, } } } - return outTuple; + out.collect(outTuple); } } - public static final class LeftSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T1> { + public static final class LeftSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T1> { private static final long serialVersionUID = 1L; @Override - public T1 join(T1 left, T2 right) throws Exception { - return left; + //public T1 join(T1 left, T2 right) throws Exception { + // return left; + //} + public void join (T1 left, T2 right, Collector<T1> out) { + out.collect(left); } } - public static final class RightSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T2> { + public static final class RightSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T2> { private static final long serialVersionUID = 1L; @Override - public T2 join(T1 left, T2 right) throws Exception { - return right; + //public T2 join(T1 left, T2 right) throws Exception { + // return right; + //} + public void join (T1 left, T2 right, Collector<T2> out) { + out.collect(right); } }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java index 03c6037..eccdeec 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java @@ -18,11 +18,10 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.java.functions.MapFunction; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.DataSet; @@ -34,7 +33,7 @@ import org.apache.flink.api.java.DataSet; * @param <IN> The type of the data set consumed by the operator. * @param <OUT> The type of the data set created by the operator. * - * @see MapFunction + * @see org.apache.flink.api.common.functions.MapFunction */ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> { @@ -42,6 +41,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) { + super(input, TypeExtractor.getMapReturnTypes(function, input.getType())); this.function = function; @@ -49,11 +49,11 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe } @Override - protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> translateToDataFlow(Operator<IN> input) { + protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : function.getClass().getName(); // create operator - MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> po = new MapOperatorBase<IN, OUT, GenericMap<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); + MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name); // set input po.setInput(input); // set dop http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java index 9e94670..dd5a3bd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java @@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators; import java.util.Arrays; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.SemanticPropUtil; @@ -51,7 +51,7 @@ public class ProjectOperator<IN, OUT extends Tuple> } @Override - protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN,OUT>> translateToDataFlow(Operator<IN> input) { + protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "Projection " + Arrays.toString(fields); // create operator PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java deleted file mode 100644 index d88d43d..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators; - -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; -import org.apache.flink.api.common.functions.GenericMap; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable; -import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; -import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.types.TypeInformation; - -import org.apache.flink.api.java.DataSet; - -/** - * This operator represents the application of a "reduceGroup" function on a data set, and the - * result data set produced by the function. - * - * @param <IN> The type of the data set consumed by the operator. - * @param <OUT> The type of the data set created by the operator. - */ -public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, ReduceGroupOperator<IN, OUT>> { - - private final GroupReduceFunction<IN, OUT> function; - - private final Grouping<IN> grouper; - - private boolean combinable; - - - /** - * Constructor for a non-grouped reduce (all reduce). - * - * @param input The input data set to the groupReduce function. - * @param function The user-defined GroupReduce function. - */ - public ReduceGroupOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) { - super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType())); - - this.function = function; - this.grouper = null; - checkCombinability(); - } - - /** - * Constructor for a grouped reduce. - * - * @param input The grouped input to be processed group-wise by the groupReduce function. - * @param function The user-defined GroupReduce function. - */ - public ReduceGroupOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) { - super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType())); - - this.function = function; - this.grouper = input; - checkCombinability(); - - extractSemanticAnnotationsFromUdf(function.getClass()); - } - - private void checkCombinability() { - if (function instanceof GenericCombine && function.getClass().getAnnotation(Combinable.class) != null) { - this.combinable = true; - } - } - - // -------------------------------------------------------------------------------------------- - // Properties - // -------------------------------------------------------------------------------------------- - - public boolean isCombinable() { - return combinable; - } - - public void setCombinable(boolean combinable) { - // sanity check that the function is a subclass of the combine interface - if (combinable && !(function instanceof GenericCombine)) { - throw new IllegalArgumentException("The function does not implement the combine interface."); - } - - this.combinable = combinable; - } - - @Override - protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) { - - String name = getName() != null ? getName() : function.getClass().getName(); - - // distinguish between grouped reduce and non-grouped reduce - if (grouper == null) { - // non grouped reduce - UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = - new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, new int[0], name); - - po.setCombinable(combinable); - // set input - po.setInput(input); - // the degree of parallelism for a non grouped reduce can only be 1 - po.setDegreeOfParallelism(1); - return po; - } - - if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) { - - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys(); - - PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer( - selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable()); - - po.setDegreeOfParallelism(this.getParallelism()); - - return po; - } - else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) { - - int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = - new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); - - po.setCombinable(combinable); - po.setInput(input); - po.setDegreeOfParallelism(this.getParallelism()); - - // set group order - if (grouper instanceof SortedGrouping) { - SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper; - - int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions(); - Order[] sortOrders = sortedGrouper.getGroupSortOrders(); - - Ordering o = new Ordering(); - for(int i=0; i < sortKeyPositions.length; i++) { - o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]); - } - po.setGroupOrder(o); - } - - return po; - } - else if (grouper.getKeys() instanceof Keys.ExpressionKeys) { - - int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()); - GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po = - new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name); - - po.setCombinable(combinable); - po.setInput(input); - po.setDegreeOfParallelism(this.getParallelism()); - - return po; - } - else { - throw new UnsupportedOperationException("Unrecognized key type."); - } - - } - - - // -------------------------------------------------------------------------------------------- - - private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionReducer( - Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function, - TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input, - boolean combinable) - { - @SuppressWarnings("unchecked") - final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys; - - TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType); - - KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor()); - - PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable); - - MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor"); - - reducer.setInput(mapper); - mapper.setInput(input); - - // set the mapper's parallelism to the input parallelism to make sure it is chained - mapper.setDegreeOfParallelism(input.getDegreeOfParallelism()); - - return reducer; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 12e0f89..13a6c91 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -18,13 +18,12 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.GenericMap; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; -import org.apache.flink.api.java.functions.ReduceFunction; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator; @@ -40,7 +39,7 @@ import org.apache.flink.api.java.DataSet; * * @param <IN> The type of the data set reduced by the operator. * - * @see ReduceFunction + * @see org.apache.flink.api.common.functions.ReduceFunction */ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> { @@ -83,8 +82,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe if (grouper == null) { // non grouped reduce UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType()); - ReduceOperatorBase<IN, GenericReduce<IN>> po = - new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, new int[0], name); + ReduceOperatorBase<IN, ReduceFunction<IN>> po = + new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name); // set input po.setInput(input); @@ -109,8 +108,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe // reduce with field positions int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType()); - ReduceOperatorBase<IN, GenericReduce<IN>> po = - new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, logicalKeyPositions, name); + ReduceOperatorBase<IN, ReduceFunction<IN>> po = + new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name); // set input po.setInput(input); @@ -139,8 +138,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, typeInfoWithKey); - MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor"); - MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor"); + MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor"); + MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor"); keyExtractingMap.setInput(input); reducer.setInput(keyExtractingMap); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java index fa2c1aa..dcdbed4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java @@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet; /** * The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that - * have one input (such as {@link org.apache.flink.api.java.functions.MapFunction} or - * {@link org.apache.flink.api.java.functions.ReduceFunction}). + * have one input (such as {@link org.apache.flink.api.java.functions.RichMapFunction} or + * {@link org.apache.flink.api.java.functions.RichReduceFunction}). * <p> * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization * through configuration objects, and semantic properties. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 89c8bb2..97b2417 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators; import java.util.Arrays; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.GroupReduceFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; /** * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/> * The following transformation can be applied on sorted groups: * <ul> - * <li>{@link SortedGrouping#reduceGroup(GroupReduceFunction)},</li> + * <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li> * </ul> * * @param <T> The type of the elements of the sorted and grouped DataSet. @@ -65,23 +67,27 @@ public class SortedGrouping<T> extends Grouping<T> { /** * Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br/> - * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet. + * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet. * A GroupReduceFunction can iterate over all elements of a group and emit any * number of output elements including none. * * @param reducer The GroupReduceFunction that is applied on each group of the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. * - * @see GroupReduceFunction - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.functions.RichGroupReduceFunction + * @see GroupReduceOperator * @see DataSet */ - public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { + public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - return new ReduceGroupOperator<T, R>(this, reducer); + if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + throw new UnsupportedLambdaExpressionException(); + } + return new GroupReduceOperator<T, R>(this, reducer); } + // -------------------------------------------------------------------------------------------- // Group Operations http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java index a85ca3f..f347fef 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java @@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet; /** * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that - * have two inputs (such as {@link org.apache.flink.api.java.functions.JoinFunction} or - * {@link org.apache.flink.api.java.functions.CoGroupFunction}). + * have two inputs (such as {@link org.apache.flink.api.java.functions.RichJoinFunction} or + * {@link org.apache.flink.api.java.functions.RichCoGroupFunction}). * <p> * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization * through configuration objects, and semantic properties. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java index 2040a27..bf33f4e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java @@ -27,8 +27,8 @@ import org.apache.flink.api.java.DataSet; /** * This interface marks operators as operators that execute user-defined functions (UDFs), such as - * {@link org.apache.flink.api.java.functions.MapFunction}, {@link org.apache.flink.api.java.functions.ReduceFunction}, - * or {@link org.apache.flink.api.java.functions.CoGroupFunction}. + * {@link org.apache.flink.api.java.functions.RichMapFunction}, {@link org.apache.flink.api.java.functions.RichReduceFunction}, + * or {@link org.apache.flink.api.java.functions.RichCoGroupFunction}. * The UDF operators stand in contrast to operators that execute built-in operations, like aggregations. */ public interface UdfOperator<O extends UdfOperator<O>> { @@ -39,7 +39,7 @@ public interface UdfOperator<O extends UdfOperator<O>> { /** * Gets the configuration parameters that will be passed to the UDF's open method - * {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)}. + * {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)}. * The configuration is set via the {@link #withParameters(Configuration)} * method. * @@ -69,7 +69,7 @@ public interface UdfOperator<O extends UdfOperator<O>> { /** * Sets the configuration parameters for the UDF. These are optional parameters that are passed - * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)} method. + * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)} method. * * @param parameters The configuration parameters for the UDF. * @return The operator itself, to allow chaining function calls. @@ -83,7 +83,7 @@ public interface UdfOperator<O extends UdfOperator<O>> { * {@link org.apache.flink.api.common.functions.RuntimeContext#getBroadcastVariable(String)}. * * The runtime context itself is available in all UDFs via - * {@link org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}. + * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()}. * * @param data The data set to be broadcasted. * @param name The name under which the broadcast data set retrieved. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 1d9d70d..9e71ba0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -18,12 +18,14 @@ package org.apache.flink.api.java.operators; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException; public class UnsortedGrouping<T> extends Grouping<T> { @@ -90,14 +92,14 @@ public class UnsortedGrouping<T> extends Grouping<T> { /** * Applies a Reduce transformation on a grouped {@link DataSet}.<br/> - * For each group, the transformation consecutively calls a {@link ReduceFunction} + * For each group, the transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction} * until only a single element for each group remains. * A ReduceFunction combines two elements into one new element of the same type. * * @param reducer The ReduceFunction that is applied on each group of the DataSet. * @return A ReduceOperator that represents the reduced DataSet. * - * @see ReduceFunction + * @see org.apache.flink.api.java.functions.RichReduceFunction * @see ReduceOperator * @see DataSet */ @@ -110,24 +112,28 @@ public class UnsortedGrouping<T> extends Grouping<T> { /** * Applies a GroupReduce transformation on a grouped {@link DataSet}.<br/> - * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet. + * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet. * A GroupReduceFunction can iterate over all elements of a group and emit any * number of output elements including none. * * @param reducer The GroupReduceFunction that is applied on each group of the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. * - * @see GroupReduceFunction - * @see ReduceGroupOperator + * @see org.apache.flink.api.java.functions.RichGroupReduceFunction + * @see GroupReduceOperator * @see DataSet */ - public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { + public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) { if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - return new ReduceGroupOperator<T, R>(this, reducer); + if (FunctionUtils.isSerializedLambdaFunction(reducer)) { + throw new UnsupportedLambdaExpressionException(); + } + return new GroupReduceOperator<T, R>(this, reducer); } + // -------------------------------------------------------------------------------------------- // Group Operations // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java index aea99e3..c7f65f0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java @@ -19,11 +19,11 @@ package org.apache.flink.api.java.operators.translation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; -public final class KeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> { +public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java index 52cbcd3..a6cd837 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java @@ -18,11 +18,11 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; -public final class KeyRemovingMapper<T, K> extends MapFunction<Tuple2<K, T>, T> { +public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java index 7fb9c0f..8ac2d01 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java @@ -18,22 +18,22 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.FilterOperatorBase; -import org.apache.flink.api.java.functions.FilterFunction; import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; -public class PlanFilterOperator<T> extends FilterOperatorBase<T, GenericFlatMap<T, T>> { +public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> { public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) { super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name); } public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>> - implements GenericFlatMap<T, T> + implements FlatMapFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java index 521814c..4de7311 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java @@ -18,22 +18,22 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.types.TypeInformation; -public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, GenericMap<T, R>> { +public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> { public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType) { super(new MapProjector<T, R>(fields, outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name); } public static final class MapProjector<T, R extends Tuple> - extends AbstractFunction - implements GenericMap<T, R> + extends AbstractRichFunction + implements MapFunction<T, R> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java index 20bd3b0..89290f0 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java @@ -20,20 +20,19 @@ package org.apache.flink.api.java.operators.translation; import java.util.Iterator; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; -import org.apache.flink.api.java.functions.CoGroupFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> - extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> + extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> { - public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, + public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -42,7 +41,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name); } - public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, + public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -51,7 +50,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> new int[]{0}, key2.computeLogicalKeyPositions(), name); } - public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, + public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -63,7 +62,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> // -------------------------------------------------------------------------------------------- public static final class TupleUnwrappingCoGrouper<I1, I2, OUT, K> extends WrappingFunction<CoGroupFunction<I1, I2, OUT>> - implements GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT> + implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java index c121efe..73ea004 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java @@ -18,20 +18,19 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.JoinOperatorBase; -import org.apache.flink.api.java.functions.JoinFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.TypeInformation; import org.apache.flink.util.Collector; public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> - extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> + extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> { - public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, + public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf, Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -40,7 +39,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name); } - public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, + public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf, int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -49,7 +48,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> new int[]{0}, key2.computeLogicalKeyPositions(), name); } - public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, + public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf, Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name, TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) { @@ -59,21 +58,26 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> } public static final class TupleUnwrappingJoiner<I1, I2, OUT, K> - extends WrappingFunction<JoinFunction<I1, I2, OUT>> - implements GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT> + extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>> + implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> { private static final long serialVersionUID = 1L; - private TupleUnwrappingJoiner(JoinFunction<I1, I2, OUT> wrapped) { + private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) { super(wrapped); } + //@SuppressWarnings("unchecked") + //@Override + //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception { + // return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1))); + //} + @SuppressWarnings("unchecked") @Override - public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2, - Collector<OUT> out) throws Exception { - out.collect(wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1)))); + public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception { + wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index 5a59664..5e80455 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -20,12 +20,11 @@ package org.apache.flink.api.java.operators.translation; import java.util.Iterator; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; -import org.apache.flink.api.java.functions.GroupReduceFunction; -import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.TypeInformation; @@ -35,12 +34,12 @@ import org.apache.flink.util.Collector; * A reduce operator that takes 2-tuples (key-value pairs), and applies the group reduce operation only * on the unwrapped values. */ -public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GenericGroupReduce<Tuple2<K, IN>,OUT>> { +public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> { public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingCombinableGroupReducer<IN, OUT, K>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), + super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); super.setCombinable(combinable); @@ -48,9 +47,9 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp // -------------------------------------------------------------------------------------------- - @Combinable - public static final class TupleUnwrappingCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>> - implements GenericGroupReduce<Tuple2<K, IN>, OUT>, GenericCombine<Tuple2<K, IN>> + @RichGroupReduceFunction.Combinable + public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> + implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>> { private static final long serialVersionUID = 1L; @@ -58,7 +57,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp private TupleUnwrappingIterator<IN, K> iter; private TupleWrappingCollector<IN, K> coll; - private TupleUnwrappingCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { + private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { super(wrapped); this.iter = new TupleUnwrappingIterator<IN, K>(); this.coll = new TupleWrappingCollector<IN, K>(this.iter); @@ -85,7 +84,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp } public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>> - implements GenericGroupReduce<Tuple2<K, IN>, OUT> + implements GroupReduceFunction<Tuple2<K, IN>, OUT> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java index 66aa430..4da981c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java @@ -18,10 +18,9 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; -import org.apache.flink.api.java.functions.ReduceFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.TypeInformation; @@ -31,7 +30,7 @@ import org.apache.flink.types.TypeInformation; * A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only * on the unwrapped values. */ -public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, GenericReduce<Tuple2<K, T>>> { +public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> { public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name, TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey) @@ -40,7 +39,7 @@ public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple } public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>> - implements GenericReduce<Tuple2<K, T>> + implements ReduceFunction<Tuple2<K, T>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java index a915d1c..ecac775 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java @@ -18,12 +18,12 @@ package org.apache.flink.api.java.operators.translation; -import org.apache.flink.api.java.functions.MapFunction; +import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -public final class TupleKeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> { +public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java index c98df6b..267d879 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java @@ -29,20 +29,21 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Value; -public abstract class WrappingFunction<T extends AbstractFunction> extends AbstractFunction { +public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction { private static final long serialVersionUID = 1L; protected final T wrappedFunction; - - + protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; } @@ -50,12 +51,12 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr @Override public void open(Configuration parameters) throws Exception { - this.wrappedFunction.open(parameters); + FunctionUtils.openFunction(this.wrappedFunction, parameters); } @Override public void close() throws Exception { - this.wrappedFunction.close(); + FunctionUtils.closeFunction(this.wrappedFunction); } @Override @@ -63,13 +64,16 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr super.setRuntimeContext(t); if (t instanceof IterationRuntimeContext) { - this.wrappedFunction.setRuntimeContext(new WrappingIterationRuntimeContext(t)); + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingIterationRuntimeContext(t)); } else{ - this.wrappedFunction.setRuntimeContext(new WrappingRuntimeContext(t)); + FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingRuntimeContext(t)); } } - + + public T getWrappedFunction () { + return this.wrappedFunction; + } private static class WrappingRuntimeContext implements RuntimeContext { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java index 633adab..fa0ca11 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java @@ -21,15 +21,14 @@ package org.apache.flink.api.java.record.functions; import java.util.Iterator; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; /** * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}. */ -public abstract class CoGroupFunction extends AbstractFunction implements GenericCoGrouper<Record, Record, Record> { +public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java index b2185a2..c4587fd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java @@ -19,15 +19,13 @@ package org.apache.flink.api.java.record.functions; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCrosser; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; /** * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}. */ -public abstract class CrossFunction extends AbstractFunction implements GenericCrosser<Record, Record, Record> { +public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; @@ -35,14 +33,19 @@ public abstract class CrossFunction extends AbstractFunction implements GenericC * This method must be implemented to provide a user implementation of a cross. * It is called for each element of the Cartesian product of both input sets. - * @param record1 The record from the second input. - * @param record2 The record from the second input. - * @param out A collector that collects all output records. + * @param first The record from the second input. + * @param second The record from the second input. + * @return The result of the cross UDF * * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the * runtime catches an exception, it aborts the task and lets the fail-over logic * decide whether to retry the task execution. */ + + //@Override + //public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception; + @Override - public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception; + public abstract Record cross(Record first, Record second) throws Exception; + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java index 0222c63..dce24a3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java @@ -19,8 +19,8 @@ package org.apache.flink.api.java.record.functions; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; @@ -28,7 +28,7 @@ import org.apache.flink.util.Collector; * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}. * It resembles an equality join of both inputs on their key fields. */ -public abstract class JoinFunction extends AbstractFunction implements GenericJoiner<Record, Record, Record> { +public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> { private static final long serialVersionUID = 1L; @@ -38,7 +38,7 @@ public abstract class JoinFunction extends AbstractFunction implements GenericJo * * @param value1 The record that comes from the first input. * @param value2 The record that comes from the second input. - * @param out A collector that collects all output pairs. + * @return The result of the join UDF as record * * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the * runtime catches an exception, it aborts the combine task and lets the fail-over logic http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java index 88b6282..99c945d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java @@ -19,7 +19,7 @@ package org.apache.flink.api.java.record.functions; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; @@ -28,7 +28,7 @@ import org.apache.flink.util.Collector; * The MapFunction must be extended to provide a mapper implementation * By definition, the mapper is called for each individual input record. */ -public abstract class MapFunction extends AbstractFunction implements GenericCollectorMap<Record, Record> { +public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java index 4b1dbb3..073b11a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java @@ -21,9 +21,9 @@ package org.apache.flink.api.java.record.functions; import java.util.Iterator; -import org.apache.flink.api.common.functions.AbstractFunction; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; @@ -31,7 +31,7 @@ import org.apache.flink.util.Collector; * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a * {@link org.apache.flink.api.java.operators.ReduceOperator}. */ -public abstract class ReduceFunction extends AbstractFunction implements GenericGroupReduce<Record, Record>, GenericCombine<Record> { +public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> { private static final long serialVersionUID = 1L;
