Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/311#discussion_r23069937 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java --- @@ -40,38 +45,108 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; public class SemanticPropUtil { - private final static String REGEX_LIST = "(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))"; - private final static String REGEX_FORWARD = "(\\s*(\\d+)\\s*->(" + REGEX_LIST + "|(\\*)))"; - private final static String REGEX_LIST_OR_FORWARD = "(" + REGEX_LIST + "|" + REGEX_FORWARD + ")"; - private final static String REGEX_ANNOTATION = "(\\s*(" + REGEX_LIST_OR_FORWARD + "\\s*;\\s*)*(" + REGEX_LIST_OR_FORWARD + "\\s*))"; + private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]"; + private final static String REGEX_SINGLE_FIELD = "[a-zA-Z0-9_\\$]+"; + private final static String REGEX_NESTED_FIELDS = "((" + REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD +")?"; + private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + ";)*(" + REGEX_NESTED_FIELDS + ");?)"; + private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS +"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))"; + private final static String REGEX_FIELD_OR_FORWARD = "(" + REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")"; + private final static String REGEX_ANNOTATION = "((" + REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)"; + + private static final Pattern PATTERN_WILDCARD = Pattern.compile(REGEX_WILDCARD); private static final Pattern PATTERN_FORWARD = Pattern.compile(REGEX_FORWARD); private static final Pattern PATTERN_ANNOTATION = Pattern.compile(REGEX_ANNOTATION); private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST); + private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_NESTED_FIELDS); - private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+"); - - public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields) { + public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType) + { SingleInputSemanticProperties ssp = new SingleInputSemanticProperties(); - for (int i = 0; i < fields.length; i++) { - ssp.addForwardedField(fields[i], i); + + int[] sourceOffsets = new int[inType.getArity()]; + sourceOffsets[0] = 0; + for(int i=1; i<inType.getArity(); i++) { + sourceOffsets[i] = inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1]; } + + int targetOffset = 0; + for(int i=0; i<fields.length; i++) { + int sourceOffset = sourceOffsets[fields[i]]; + int numFieldsToCopy = inType.getTypeAt(fields[i]).getTotalFields(); + + for(int j=0; j<numFieldsToCopy; j++) { + ssp.addForwardedField(sourceOffset+j, targetOffset+j); + } + targetOffset += numFieldsToCopy; + } + return ssp; } - public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) { + public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst, + TypeInformation<?> inType1, TypeInformation<?> inType2) + { DualInputSemanticProperties dsp = new DualInputSemanticProperties(); - for (int i = 0; i < fields.length; i++) { - if (isFromFirst[i]) { - dsp.addForwardedField1(fields[i], i); + int[] sourceOffsets1; + if(inType1 instanceof TupleTypeInfo<?>) { + sourceOffsets1 = new int[inType1.getArity()]; + sourceOffsets1[0] = 0; + for(int i=1; i<inType1.getArity(); i++) { + sourceOffsets1[i] = ((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + sourceOffsets1[i-1]; + } + } else { + sourceOffsets1 = new int[] {0}; + } + + int[] sourceOffsets2; + if(inType2 instanceof TupleTypeInfo<?>) { + sourceOffsets2 = new int[inType2.getArity()]; + sourceOffsets2[0] = 0; + for(int i=1; i<inType2.getArity(); i++) { + sourceOffsets2[i] = ((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + sourceOffsets2[i-1]; + } + } else { + sourceOffsets2 = new int[] {0}; + } --- End diff -- Line 97 - 117 look pretty copy-pasted. I think a method that gets a TupleTypeInfo and returns the sourceOffset would be nicer here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---