[FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors
This closes #532 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f39aec82 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f39aec82 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f39aec82 Branch: refs/heads/master Commit: f39aec82d6cd1286f129b11366d101cb646716ee Parents: dda8565 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Mar 24 15:18:21 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Apr 3 20:41:40 2015 +0200 ---------------------------------------------------------------------- .../api/java/functions/SemanticPropUtil.java | 79 ++ .../api/java/operators/CoGroupOperator.java | 25 + .../java/operators/GroupCombineOperator.java | 23 + .../api/java/operators/GroupReduceOperator.java | 22 + .../flink/api/java/operators/JoinOperator.java | 23 + .../api/java/operators/ReduceOperator.java | 22 + .../translation/TwoKeyExtractingMapper.java | 3 +- .../java/functions/SemanticPropUtilTest.java | 1008 ++++++++++-------- .../api/java/operator/CoGroupOperatorTest.java | 132 +++ .../java/operator/GroupCombineOperatorTest.java | 345 ++++++ .../java/operator/GroupReduceOperatorTest.java | 345 ++++++ .../api/java/operator/JoinOperatorTest.java | 183 +++- .../api/java/operator/ReduceOperatorTest.java | 238 +++++ .../javaApiOperators/GroupReduceITCase.java | 58 + 14 files changed, 2054 insertions(+), 452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index 3343671..4569be3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -153,6 +153,85 @@ public class SemanticPropUtil { return dsp; } + /** + * Creates SemanticProperties by adding an offset to each input field index of the given SemanticProperties. + * + * @param props The SemanticProperties to which the offset is added. + * @param numInputFields The original number of fields of the input. + * @param offset The offset that is added to each input field index. + * @return New SemanticProperties with added offset. + */ + public static SingleInputSemanticProperties addSourceFieldOffset(SingleInputSemanticProperties props, + int numInputFields, int offset) { + + SingleInputSemanticProperties offsetProps = new SingleInputSemanticProperties(); + if (props.getReadFields(0) != null) { + FieldSet offsetReadFields = new FieldSet(); + for (int r : props.getReadFields(0)) { + offsetReadFields = offsetReadFields.addField(r + offset); + } + offsetProps.addReadFields(offsetReadFields); + } + for (int s = 0; s < numInputFields; s++) { + FieldSet targetFields = props.getForwardingTargetFields(0, s); + for (int t : targetFields) { + offsetProps.addForwardedField(s + offset, t); + } + } + return offsetProps; + } + + /** + * Creates SemanticProperties by adding offsets to each input field index of the given SemanticProperties. + * + * @param props The SemanticProperties to which the offset is added. + * @param numInputFields1 The original number of fields of the first input. + * @param numInputFields2 The original number of fields of the second input. + * @param offset1 The offset that is added to each input field index of the first input. + * @param offset2 The offset that is added to each input field index of the second input. + * @return New SemanticProperties with added offsets. + */ + public static DualInputSemanticProperties addSourceFieldOffsets(DualInputSemanticProperties props, + int numInputFields1, int numInputFields2, + int offset1, int offset2) { + + DualInputSemanticProperties offsetProps = new DualInputSemanticProperties(); + + // add offset to read fields on first input + if(props.getReadFields(0) != null) { + FieldSet offsetReadFields = new FieldSet(); + for(int r : props.getReadFields(0)) { + offsetReadFields = offsetReadFields.addField(r+offset1); + } + offsetProps.addReadFields(0, offsetReadFields); + } + // add offset to read fields on second input + if(props.getReadFields(1) != null) { + FieldSet offsetReadFields = new FieldSet(); + for(int r : props.getReadFields(1)) { + offsetReadFields = offsetReadFields.addField(r+offset2); + } + offsetProps.addReadFields(1, offsetReadFields); + } + + // add offset to forward fields on first input + for(int s = 0; s < numInputFields1; s++) { + FieldSet targetFields = props.getForwardingTargetFields(0, s); + for(int t : targetFields) { + offsetProps.addForwardedField(0, s + offset1, t); + } + } + // add offset to forward fields on second input + for(int s = 0; s < numInputFields2; s++) { + FieldSet targetFields = props.getForwardingTargetFields(1, s); + for(int t : targetFields) { + offsetProps.addForwardedField(1, s + offset2, t); + } + } + + return offsetProps; + } + public static SingleInputSemanticProperties getSemanticPropsSingle( Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) { if (set == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java index a051eb0..115a238 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; @@ -40,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.Keys.ExpressionKeys; @@ -129,6 +131,29 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU return function; } + @Override + public DualInputSemanticProperties getSemanticProperties() { + + DualInputSemanticProperties props = super.getSemanticProperties(); + + // offset semantic information by extracted key fields + if(props != null && + (this.keys1 instanceof Keys.SelectorFunctionKeys || + this.keys2 instanceof Keys.SelectorFunctionKeys)) { + + int numFields1 = this.getInput1Type().getTotalFields(); + int numFields2 = this.getInput2Type().getTotalFields(); + int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ? + ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0; + int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ? + ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0; + + props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2); + } + + return props; + } + protected Keys<I1> getKeys1() { return this.keys1; } http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java index 911c608..dc26fec 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java @@ -23,11 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator; @@ -86,6 +88,27 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU return function; } + @Override + public SingleInputSemanticProperties getSemanticProperties() { + + SingleInputSemanticProperties props = super.getSemanticProperties(); + + // offset semantic information by extracted key fields + if(props != null && + this.grouper != null && + this.grouper.keys instanceof Keys.SelectorFunctionKeys) { + + int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields(); + if(this.grouper instanceof SortedGrouping) { + offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields(); + } + + props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset); + } + + return props; + } + // -------------------------------------------------------------------------------------------- // Translation // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index c96b7c6..bc4413f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator; @@ -120,6 +122,26 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT return this; } + @Override + public SingleInputSemanticProperties getSemanticProperties() { + + SingleInputSemanticProperties props = super.getSemanticProperties(); + + // offset semantic information by extracted key fields + if(props != null && + this.grouper != null && + this.grouper.keys instanceof Keys.SelectorFunctionKeys) { + + int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields(); + if(this.grouper instanceof SortedGrouping) { + offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields(); + } + props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset); + } + + return props; + } + // -------------------------------------------------------------------------------------------- // Translation // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java index e450ae1..4adf6b3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java @@ -225,6 +225,29 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, } @Override + public DualInputSemanticProperties getSemanticProperties() { + + DualInputSemanticProperties props = super.getSemanticProperties(); + + // offset semantic information by extracted key fields + if(props != null && + (this.keys1 instanceof Keys.SelectorFunctionKeys || + this.keys2 instanceof Keys.SelectorFunctionKeys)) { + + int numFields1 = this.getInput1Type().getTotalFields(); + int numFields2 = this.getInput2Type().getTotalFields(); + int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ? + ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0; + int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ? + ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0; + + props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2); + } + + return props; + } + + @Override protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) { if (function instanceof DefaultJoin.WrappingFlatJoinFunction) { return super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) function).getWrappedFunction().getClass()); http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java index 5951df8..e770278 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java @@ -21,10 +21,12 @@ package org.apache.flink.api.java.operators; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; import org.apache.flink.api.java.operators.translation.KeyRemovingMapper; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator; @@ -78,6 +80,26 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe } @Override + public SingleInputSemanticProperties getSemanticProperties() { + + SingleInputSemanticProperties props = super.getSemanticProperties(); + + // offset semantic information by extracted key fields + if(props != null && + this.grouper != null && + this.grouper.keys instanceof Keys.SelectorFunctionKeys) { + + int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields(); + if(this.grouper instanceof SortedGrouping) { + offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields(); + } + props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset); + } + + return props; + } + + @Override protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) { String name = getName() != null ? getName() : "Reduce at "+defaultName; http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java index 48d2d1a..a98c899 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java @@ -18,11 +18,12 @@ package org.apache.flink.api.java.operators.translation; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple3; - +@ForwardedFields("*->2") public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> { private static final long serialVersionUID = 1L;