Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r170296184
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
    @@ -149,45 +148,39 @@ object UpdatingPlanChecker {
               }
     
             case j: DataStreamJoin =>
    -          val joinType = j.getJoinType
    -          joinType match {
    -            case JoinRelType.INNER =>
    -              // get key(s) for inner join
    -              val lInKeys = visit(j.getLeft)
    -              val rInKeys = visit(j.getRight)
    -              if (lInKeys.isEmpty || rInKeys.isEmpty) {
    -                None
    -              } else {
    -                // Output of inner join must have keys if left and right 
both contain key(s).
    -                // Key groups from both side will be merged by join 
equi-predicates
    -                val lInNames: Seq[String] = 
j.getLeft.getRowType.getFieldNames
    -                val rInNames: Seq[String] = 
j.getRight.getRowType.getFieldNames
    -                val joinNames = j.getRowType.getFieldNames
    -
    -                // if right field names equal to left field names, calcite 
will rename right
    -                // field names. For example, T1(pk, a) join T2(pk, b), 
calcite will rename T2(pk, b)
    -                // to T2(pk0, b).
    -                val rInNamesToJoinNamesMap = rInNames
    -                  .zip(joinNames.subList(lInNames.size, joinNames.length))
    -                  .toMap
    +          // get key(s) for inner join
    +          val lInKeys = visit(j.getLeft)
    +          val rInKeys = visit(j.getRight)
    +          if (lInKeys.isEmpty || rInKeys.isEmpty) {
    +            None
    +          } else {
    +            // Output of inner join must have keys if left and right both 
contain key(s).
    --- End diff --
    
    remove "inner"?


---

Reply via email to