Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r188531785
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
    @@ -176,14 +179,34 @@ class DataStreamJoin(
           body,
           returnType)
     
    -    val coMapFun =
    -      new NonWindowInnerJoin(
    -        leftSchema.typeInfo,
    -        rightSchema.typeInfo,
    -        CRowTypeInfo(returnType),
    -        genFunction.name,
    -        genFunction.code,
    -        queryConfig)
    +    val coMapFun = joinType match {
    +      case JoinRelType.INNER =>
    +        new NonWindowInnerJoin(
    +          leftSchema.typeInfo,
    +          rightSchema.typeInfo,
    +          CRowTypeInfo(returnType),
    +          genFunction.name,
    +          genFunction.code,
    +          queryConfig)
    +      case JoinRelType.LEFT if joinInfo.isEqui =>
    +        new NonWindowLeftRightJoin(
    +          leftSchema.typeInfo,
    +          rightSchema.typeInfo,
    +          CRowTypeInfo(returnType),
    +          genFunction.name,
    +          genFunction.code,
    +          joinType == JoinRelType.LEFT,
    +          queryConfig)
    +      case JoinRelType.LEFT =>
    --- End diff --
    
    We can also do it as part of FLINK-8429.


---

Reply via email to