[FLINK-1776] Add offsets to field indexes of semantic properties for operators 
with key selectors

This closes #532


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f39aec82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f39aec82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f39aec82

Branch: refs/heads/master
Commit: f39aec82d6cd1286f129b11366d101cb646716ee
Parents: dda8565
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Mar 24 15:18:21 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Apr 3 20:41:40 2015 +0200

----------------------------------------------------------------------
 .../api/java/functions/SemanticPropUtil.java    |   79 ++
 .../api/java/operators/CoGroupOperator.java     |   25 +
 .../java/operators/GroupCombineOperator.java    |   23 +
 .../api/java/operators/GroupReduceOperator.java |   22 +
 .../flink/api/java/operators/JoinOperator.java  |   23 +
 .../api/java/operators/ReduceOperator.java      |   22 +
 .../translation/TwoKeyExtractingMapper.java     |    3 +-
 .../java/functions/SemanticPropUtilTest.java    | 1008 ++++++++++--------
 .../api/java/operator/CoGroupOperatorTest.java  |  132 +++
 .../java/operator/GroupCombineOperatorTest.java |  345 ++++++
 .../java/operator/GroupReduceOperatorTest.java  |  345 ++++++
 .../api/java/operator/JoinOperatorTest.java     |  183 +++-
 .../api/java/operator/ReduceOperatorTest.java   |  238 +++++
 .../javaApiOperators/GroupReduceITCase.java     |   58 +
 14 files changed, 2054 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 3343671..4569be3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -153,6 +153,85 @@ public class SemanticPropUtil {
                return dsp;
        }
 
+       /**
+        * Creates SemanticProperties by adding an offset to each input field 
index of the given SemanticProperties.
+        *
+        * @param props The SemanticProperties to which the offset is added.
+        * @param numInputFields The original number of fields of the input.
+        * @param offset The offset that is added to each input field index.
+        * @return New SemanticProperties with added offset.
+        */
+       public static SingleInputSemanticProperties 
addSourceFieldOffset(SingleInputSemanticProperties props,
+                                                                               
                                                                int 
numInputFields, int offset) {
+
+               SingleInputSemanticProperties offsetProps = new 
SingleInputSemanticProperties();
+               if (props.getReadFields(0) != null) {
+                       FieldSet offsetReadFields = new FieldSet();
+                       for (int r : props.getReadFields(0)) {
+                               offsetReadFields = offsetReadFields.addField(r 
+ offset);
+                       }
+                       offsetProps.addReadFields(offsetReadFields);
+               }
+               for (int s = 0; s < numInputFields; s++) {
+                       FieldSet targetFields = 
props.getForwardingTargetFields(0, s);
+                       for (int t : targetFields) {
+                               offsetProps.addForwardedField(s + offset, t);
+                       }
+               }
+               return offsetProps;
+       }
+
+       /**
+        * Creates SemanticProperties by adding offsets to each input field 
index of the given SemanticProperties.
+        *
+        * @param props The SemanticProperties to which the offset is added.
+        * @param numInputFields1 The original number of fields of the first 
input.
+        * @param numInputFields2 The original number of fields of the second 
input.
+        * @param offset1 The offset that is added to each input field index of 
the first input.
+        * @param offset2 The offset that is added to each input field index of 
the second input.
+        * @return New SemanticProperties with added offsets.
+        */
+       public static DualInputSemanticProperties 
addSourceFieldOffsets(DualInputSemanticProperties props,
+                                                                               
                                                        int numInputFields1, 
int numInputFields2,
+                                                                               
                                                        int offset1, int 
offset2) {
+
+               DualInputSemanticProperties offsetProps = new 
DualInputSemanticProperties();
+
+               // add offset to read fields on first input
+               if(props.getReadFields(0) != null) {
+                       FieldSet offsetReadFields = new FieldSet();
+                       for(int r : props.getReadFields(0)) {
+                               offsetReadFields = 
offsetReadFields.addField(r+offset1);
+                       }
+                       offsetProps.addReadFields(0, offsetReadFields);
+               }
+               // add offset to read fields on second input
+               if(props.getReadFields(1) != null) {
+                       FieldSet offsetReadFields = new FieldSet();
+                       for(int r : props.getReadFields(1)) {
+                               offsetReadFields = 
offsetReadFields.addField(r+offset2);
+                       }
+                       offsetProps.addReadFields(1, offsetReadFields);
+               }
+
+               // add offset to forward fields on first input
+               for(int s = 0; s < numInputFields1; s++) {
+                       FieldSet targetFields = 
props.getForwardingTargetFields(0, s);
+                       for(int t : targetFields) {
+                               offsetProps.addForwardedField(0, s + offset1, 
t);
+                       }
+               }
+               // add offset to forward fields on second input
+               for(int s = 0; s < numInputFields2; s++) {
+                       FieldSet targetFields = 
props.getForwardingTargetFields(1, s);
+                       for(int t : targetFields) {
+                               offsetProps.addForwardedField(1, s + offset2, 
t);
+                       }
+               }
+
+               return offsetProps;
+       }
+
        public static SingleInputSemanticProperties getSemanticPropsSingle(
                        Set<Annotation> set, TypeInformation<?> inType, 
TypeInformation<?> outType) {
                if (set == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index a051eb0..115a238 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -40,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import 
org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -129,6 +131,29 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                return function;
        }
 
+       @Override
+       public DualInputSemanticProperties getSemanticProperties() {
+
+               DualInputSemanticProperties props = 
super.getSemanticProperties();
+
+               // offset semantic information by extracted key fields
+               if(props != null &&
+                                       (this.keys1 instanceof 
Keys.SelectorFunctionKeys ||
+                                       this.keys2 instanceof 
Keys.SelectorFunctionKeys)) {
+
+                       int numFields1 = this.getInput1Type().getTotalFields();
+                       int numFields2 = this.getInput2Type().getTotalFields();
+                       int offset1 = (this.keys1 instanceof 
Keys.SelectorFunctionKeys) ?
+                                       ((Keys.SelectorFunctionKeys) 
this.keys1).getKeyType().getTotalFields() : 0;
+                       int offset2 = (this.keys2 instanceof 
Keys.SelectorFunctionKeys) ?
+                                       ((Keys.SelectorFunctionKeys) 
this.keys2).getKeyType().getTotalFields() : 0;
+
+                       props = SemanticPropUtil.addSourceFieldOffsets(props, 
numFields1, numFields2, offset1, offset2);
+               }
+
+               return props;
+       }
+
        protected Keys<I1> getKeys1() {
                return this.keys1;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 911c608..dc26fec 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
@@ -86,6 +88,27 @@ public class GroupCombineOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OU
                return function;
        }
 
+       @Override
+       public SingleInputSemanticProperties getSemanticProperties() {
+
+               SingleInputSemanticProperties props = 
super.getSemanticProperties();
+
+               // offset semantic information by extracted key fields
+               if(props != null &&
+                               this.grouper != null &&
+                               this.grouper.keys instanceof 
Keys.SelectorFunctionKeys) {
+
+                       int offset = ((Keys.SelectorFunctionKeys) 
this.grouper.keys).getKeyType().getTotalFields();
+                       if(this.grouper instanceof SortedGrouping) {
+                               offset += ((SortedGrouping) 
this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+                       }
+
+                       props = SemanticPropUtil.addSourceFieldOffset(props, 
this.getInputType().getTotalFields(), offset);
+               }
+
+               return props;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Translation
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c96b7c6..bc4413f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
@@ -120,6 +122,26 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
                return this;
        }
 
+       @Override
+       public SingleInputSemanticProperties getSemanticProperties() {
+
+               SingleInputSemanticProperties props = 
super.getSemanticProperties();
+
+               // offset semantic information by extracted key fields
+               if(props != null &&
+                               this.grouper != null &&
+                               this.grouper.keys instanceof 
Keys.SelectorFunctionKeys) {
+
+                       int offset = ((Keys.SelectorFunctionKeys) 
this.grouper.keys).getKeyType().getTotalFields();
+                       if(this.grouper instanceof SortedGrouping) {
+                               offset += ((SortedGrouping) 
this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+                       }
+                       props = SemanticPropUtil.addSourceFieldOffset(props, 
this.getInputType().getTotalFields(), offset);
+               }
+
+               return props;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Translation
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e450ae1..4adf6b3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -225,6 +225,29 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                }
 
                @Override
+               public DualInputSemanticProperties getSemanticProperties() {
+
+                       DualInputSemanticProperties props = 
super.getSemanticProperties();
+
+                       // offset semantic information by extracted key fields
+                       if(props != null &&
+                                       (this.keys1 instanceof 
Keys.SelectorFunctionKeys ||
+                                                       this.keys2 instanceof 
Keys.SelectorFunctionKeys)) {
+
+                               int numFields1 = 
this.getInput1Type().getTotalFields();
+                               int numFields2 = 
this.getInput2Type().getTotalFields();
+                               int offset1 = (this.keys1 instanceof 
Keys.SelectorFunctionKeys) ?
+                                               ((Keys.SelectorFunctionKeys) 
this.keys1).getKeyType().getTotalFields() : 0;
+                               int offset2 = (this.keys2 instanceof 
Keys.SelectorFunctionKeys) ?
+                                               ((Keys.SelectorFunctionKeys) 
this.keys2).getKeyType().getTotalFields() : 0;
+
+                               props = 
SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, 
offset2);
+                       }
+
+                       return props;
+               }
+
+               @Override
                protected DualInputSemanticProperties 
extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
                        if (function instanceof 
DefaultJoin.WrappingFlatJoinFunction) {
                                return 
super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) 
function).getWrappedFunction().getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 5951df8..e770278 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -78,6 +80,26 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
        }
 
        @Override
+       public SingleInputSemanticProperties getSemanticProperties() {
+
+               SingleInputSemanticProperties props = 
super.getSemanticProperties();
+
+               // offset semantic information by extracted key fields
+               if(props != null &&
+                               this.grouper != null &&
+                               this.grouper.keys instanceof 
Keys.SelectorFunctionKeys) {
+
+                       int offset = ((Keys.SelectorFunctionKeys) 
this.grouper.keys).getKeyType().getTotalFields();
+                       if(this.grouper instanceof SortedGrouping) {
+                               offset += ((SortedGrouping) 
this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+                       }
+                       props = SemanticPropUtil.addSourceFieldOffset(props, 
this.getInputType().getTotalFields(), offset);
+               }
+
+               return props;
+       }
+
+       @Override
        protected org.apache.flink.api.common.operators.SingleInputOperator<?, 
IN, ?> translateToDataFlow(Operator<IN> input) {
                
                String name = getName() != null ? getName() : "Reduce at 
"+defaultName;

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 48d2d1a..a98c899 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 
-
+@ForwardedFields("*->2")
 public final class TwoKeyExtractingMapper<T, K1, K2> extends 
RichMapFunction<T, Tuple3<K1, K2, T>> {
 
        private static final long serialVersionUID = 1L;

Reply via email to