Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439571 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) { } @Override - protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + protected AbstractJoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder<OUT> builder = new JoinOperatorBaseBuilder<OUT>(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - final JoinOperatorBase<?, ?, OUT, ?> translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; - - PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, - getInput1Type(), getInput2Type(), getResultType(), name, input1, input2); - - // set parallelism - po.setParallelism(this.getParallelism()); - - translated = po; + builder = builder + .withUdf(function) + .withInput1(input1, getInput1Type(), keys1) + .withInput2(input2, getInput2Type(), keys2); + } else { + throw new UnsupportedOperationException("Unrecognized or incompatible key types."); } - else if (keys2 instanceof Keys.SelectorFunctionKeys) { - // The right side of the join needs the tuple wrapping/unwrapping - int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); + return builder.build(); + } - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = - (Keys.SelectorFunctionKeys<I2, ?>) keys2; - PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2, - function, getInput1Type(), getInput2Type(), getResultType(), name, - input1, input2); + private static final class JoinOperatorBaseBuilder<OUT> { + + private final String name; + private final JoinType joinType; + + private int parallelism; + private FlatJoinFunction udf; + private TypeInformation<OUT> resultType; + + private Operator input1; + private TypeInformation input1Type; + private Keys<?> keys1; + + private Operator input2; + private TypeInformation input2Type; + private Keys<?> keys2; - // set parallelism - po.setParallelism(this.getParallelism()); + private Partitioner<?> partitioner; + private JoinHint joinHint; - translated = po; + public JoinOperatorBaseBuilder(String name, JoinType joinType) { + this.name = name; + this.joinType = joinType; } - else if (keys1 instanceof Keys.SelectorFunctionKeys) { - // The left side of the join needs the tuple wrapping/unwrapping + public JoinOperatorBaseBuilder<OUT> with() { --- End diff -- This method is not used and can be removed?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---