Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23069937
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 ---
    @@ -40,38 +45,108 @@
     import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
     import 
org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
     import 
org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
    +import org.apache.flink.api.java.operators.Keys;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
     
     public class SemanticPropUtil {
     
    -   private final static String REGEX_LIST = 
"(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))";
    -   private final static String REGEX_FORWARD = "(\\s*(\\d+)\\s*->(" + 
REGEX_LIST + "|(\\*)))";
    -   private final static String REGEX_LIST_OR_FORWARD = "(" + REGEX_LIST + 
"|" + REGEX_FORWARD + ")";
    -   private final static String REGEX_ANNOTATION = "(\\s*(" + 
REGEX_LIST_OR_FORWARD + "\\s*;\\s*)*(" + REGEX_LIST_OR_FORWARD + "\\s*))";
    +   private final static String REGEX_WILDCARD = "[\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
    +   private final static String REGEX_SINGLE_FIELD = "[a-zA-Z0-9_\\$]+";
    +   private final static String REGEX_NESTED_FIELDS = "((" + 
REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD 
+")?";
     
    +   private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + 
";)*(" + REGEX_NESTED_FIELDS + ");?)";
    +   private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS 
+"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))";
    +   private final static String REGEX_FIELD_OR_FORWARD = "(" + 
REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")";
    +   private final static String REGEX_ANNOTATION = "((" + 
REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)";
    +
    +   private static final Pattern PATTERN_WILDCARD = 
Pattern.compile(REGEX_WILDCARD);
        private static final Pattern PATTERN_FORWARD = 
Pattern.compile(REGEX_FORWARD);
        private static final Pattern PATTERN_ANNOTATION = 
Pattern.compile(REGEX_ANNOTATION);
        private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST);
    +   private static final Pattern PATTERN_FIELD = 
Pattern.compile(REGEX_NESTED_FIELDS);
     
    -   private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+");
    -
    -   public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields) {
    +   public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType)
    +   {
                SingleInputSemanticProperties ssp = new 
SingleInputSemanticProperties();
    -           for (int i = 0; i < fields.length; i++) {
    -                   ssp.addForwardedField(fields[i], i);
    +
    +           int[] sourceOffsets = new int[inType.getArity()];
    +           sourceOffsets[0] = 0;
    +           for(int i=1; i<inType.getArity(); i++) {
    +                   sourceOffsets[i] = 
inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1];
                }
    +
    +           int targetOffset = 0;
    +           for(int i=0; i<fields.length; i++) {
    +                   int sourceOffset = sourceOffsets[fields[i]];
    +                   int numFieldsToCopy = 
inType.getTypeAt(fields[i]).getTotalFields();
    +
    +                   for(int j=0; j<numFieldsToCopy; j++) {
    +                           ssp.addForwardedField(sourceOffset+j, 
targetOffset+j);
    +                   }
    +                   targetOffset += numFieldsToCopy;
    +           }
    +
                return ssp;
        }
     
    -   public static DualInputSemanticProperties 
createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) {
    +   public static DualInputSemanticProperties 
createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst,
    +                                                                           
                                                                                
TypeInformation<?> inType1, TypeInformation<?> inType2)
    +   {
                DualInputSemanticProperties dsp = new 
DualInputSemanticProperties();
     
    -           for (int i = 0; i < fields.length; i++) {
    -                   if (isFromFirst[i]) {
    -                           dsp.addForwardedField1(fields[i], i);
    +           int[] sourceOffsets1;
    +           if(inType1 instanceof TupleTypeInfo<?>) {
    +                   sourceOffsets1 = new int[inType1.getArity()];
    +                   sourceOffsets1[0] = 0;
    +                   for(int i=1; i<inType1.getArity(); i++) {
    +                           sourceOffsets1[i] = 
((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + 
sourceOffsets1[i-1];
    +                   }
    +           } else {
    +                   sourceOffsets1 = new int[] {0};
    +           }
    +
    +           int[] sourceOffsets2;
    +           if(inType2 instanceof TupleTypeInfo<?>) {
    +                   sourceOffsets2 = new int[inType2.getArity()];
    +                   sourceOffsets2[0] = 0;
    +                   for(int i=1; i<inType2.getArity(); i++) {
    +                           sourceOffsets2[i] = 
((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + 
sourceOffsets2[i-1];
    +                   }
    +           } else {
    +                   sourceOffsets2 = new int[] {0};
    +           }
    --- End diff --
    
    Line 97 - 117 look pretty copy-pasted. I think a method that gets a 
TupleTypeInfo and returns the sourceOffset would be nicer here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to