Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1981#discussion_r64714427
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
    @@ -269,22 +269,57 @@ case class Join(
         condition: Option[Expression]) extends BinaryNode {
     
       override def output: Seq[Attribute] = {
    -    joinType match {
    -      case JoinType.INNER => left.output ++ right.output
    -      case j => throw new ValidationException(s"Unsupported JoinType: $j")
    +    left.output ++ right.output
    +  }
    +
    +  private case class JoinFieldReference(
    +    name: String,
    +    resultType: TypeInformation[_],
    +    left: LogicalNode,
    +    right: LogicalNode) extends Attribute {
    +
    +    override def toString = s"'$name"
    +
    +    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +      val joinInputField = left.output.zipWithIndex.find(_._1.name == name)
    +                           .orElse(
    +                             right.output.zipWithIndex.find(_._1.name == 
name)
    +                             .map(x => (x._1, left.output.length + 
x._2))).get
    +
    +      new RexInputRef(joinInputField._2,
    --- End diff --
    
    Regarding the `RelBuilder.field` I also thought it is like you said, but 
unfortunately in case of Join it is not. The `RelBuilder.field` returns index 
to seperate inputs, while in `Join` operation it should point to  input being 
concatenation of both inputs. That is why I compute the index myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to