Repository: flink Updated Branches: refs/heads/master 153a67881 -> 544abb937
http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 1193da5..6791741 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -18,20 +18,16 @@ package org.apache.flink.api.java.operators; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; -import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; +import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.DataSet; /** @@ -89,9 +85,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe // offset semantic information by extracted key fields if(props != null && this.grouper != null && - this.grouper.keys instanceof Keys.SelectorFunctionKeys) { + this.grouper.keys instanceof SelectorFunctionKeys) { - int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields(); + int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields(); if(this.grouper instanceof SortedGrouping) { offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields(); } @@ -109,9 +105,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe // distinguish between grouped reduce and non-grouped reduce if (grouper == null) { // non grouped reduce - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getInputType()); ReduceOperatorBase<IN, ReduceFunction<IN>> po = - new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name); + new ReduceOperatorBase<>(function, operatorInfo, new int[0], name); po.setInput(input); // the parallelism for a non grouped reduce can only be 1 @@ -120,13 +116,14 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe return po; } - if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) { + if (grouper.getKeys() instanceof SelectorFunctionKeys) { // reduce with key selector function @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys(); - - MapOperatorBase<?, IN, ?> po = translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, input, getParallelism()); + SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys(); + + org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> po = + translateSelectorFunctionReducer(selectorKeys, function, getInputType(), name, input, getParallelism()); ((PlanUnwrappingReduceOperator<?, ?>) po.getInput()).setCustomPartitioner(grouper.getCustomPartitioner()); return po; @@ -135,9 +132,9 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe // reduce with field positions int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getInputType()); ReduceOperatorBase<IN, ReduceFunction<IN>> po = - new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name); + new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name); po.setCustomPartitioner(grouper.getCustomPartitioner()); @@ -153,30 +150,24 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe // -------------------------------------------------------------------------------------------- - private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys, - ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int parallelism) + private static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateSelectorFunctionReducer( + SelectorFunctionKeys<T, ?> rawKeys, + ReduceFunction<T> function, + TypeInformation<T> inputType, + String name, + Operator<T> input, + int parallelism) { @SuppressWarnings("unchecked") - final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys; - - TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType); - - KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor()); + final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys; - PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, typeInfoWithKey); + TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys); + Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys); - MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor"); - MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor"); - - keyExtractingMap.setInput(input); - reducer.setInput(keyExtractingMap); - keyRemovingMap.setInput(reducer); - - // set parallelism - keyExtractingMap.setParallelism(input.getParallelism()); + PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<>(function, keys, name, inputType, typeInfoWithKey); + reducer.setInput(keyedInput); reducer.setParallelism(parallelism); - keyRemovingMap.setParallelism(parallelism); - - return keyRemovingMap; + + return SelectorFunctionKeys.appendKeyRemover(reducer, keys); } } http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index b488dd1..6092d14 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.Utils; @@ -124,6 +125,16 @@ public class SortedGrouping<T> extends Grouping<T> { protected Order[] getGroupSortOrders() { return this.groupSortOrders; } + + protected Ordering getGroupOrdering() { + + Ordering o = new Ordering(); + for(int i=0; i < this.groupSortKeyPositions.length; i++) { + o.appendOrdering(this.groupSortKeyPositions[i], null, this.groupSortOrders[i]); + } + + return o; + } /** * Uses a custom partitioner for the grouping.