[ 
https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004717#comment-16004717
 ] 

ASF GitHub Bot commented on FLINK-5256:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3673#discussion_r115746347
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
 ---
    @@ -187,9 +187,155 @@ class SingleRowJoinTest extends TableTestBase {
           ),
           term("where", "AND(<(a1, b1)", "=(a2, b2))"),
           term("join", "a1", "a2", "b1", "b2"),
    -      term("joinType", "NestedLoopJoin")
    +      term("joinType", "NestedLoopInnerJoin")
         )
     
         util.verifySql(query, expected)
       }
    +
    +  @Test
    +  def testSingleRowJoinLeftOuterJoin(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Long, Int)]("A", 'a1, 'a2)
    +    util.addTable[(Int, Int)]("B", 'b1, 'b2)
    +
    +    val queryLeftJoin =
    +      "SELECT a2 FROM A " +
    +        "LEFT JOIN " +
    +        "(SELECT COUNT(*) AS cnt FROM B) " +
    +        "AS x " +
    +        "ON a1 = cnt"
    +
    +    val expected =
    +      unaryNode(
    +        "DataSetCalc",
    +        unaryNode(
    +          "DataSetSingleRowJoin",
    +          batchTableNode(0),
    +          term("where", "=(a1, cnt)"),
    +          term("join", "a1", "a2", "cnt"),
    +          term("joinType", "NestedLoopLeftJoin")
    +        ),
    +        term("select", "a2")
    +      ) + "\n" +
    +        unaryNode(
    +          "DataSetAggregate",
    +          unaryNode(
    +            "DataSetUnion",
    +            unaryNode(
    +              "DataSetValues",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(1),
    +                term("select", "0 AS $f0")),
    +              tuples(List(null)), term("values", "$f0")
    +            ),
    +            term("union", "$f0")
    +          ),
    +          term("select", "COUNT(*) AS cnt")
    +        )
    +
    +    util.verifySql(queryLeftJoin, expected)
    +  }
    +
    +  @Test
    +  def testSingleRowJoinRightOuterJoin(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Long, Int)]("A", 'a1, 'a2)
    +    util.addTable[(Int, Int)]("B", 'b1, 'b2)
    +
    +    val queryRightJoin =
    --- End diff --
    
    The generate join is a `RightOuterJoin` but not a `SingleRowJoin`, which 
this test should verify. 
    
    We had to disable outer joins with predicates that include non-equi 
conditions in FLINK-5520 because they were not properly implemented. That 
implementation was based on splitting the join predicate into equi-conditions 
which were evaluated by the join and non-equi-conditions which were evaluated 
in a subsequent filter step. However, this split did not work correctly, 
because it would generate too many `null` rows if records passed the equi-join 
predicate in the join but not the non-equi predicate in the filter (since each 
filter call did only see a single row and would not know if all other rows had 
been filtered as well).
    
    In our case the situation is different. We are translating the join into a 
`NestedLoopJoin (where one side is at most one record), which can evaluate the 
full predicate including the non-equi conditions inside the join and know if we 
need to emit a `null` result because there is only a single row that either 
matches the predicate or not.


> Extend DataSetSingleRowJoin to support Left and Right joins
> -----------------------------------------------------------
>
>                 Key: FLINK-5256
>                 URL: https://issues.apache.org/jira/browse/FLINK-5256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary 
> inner joins where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and 
> right joins with single input. These cases can be handled if the  
> {{DataSetSingleRowJoin}} is extended to support outer joins on the 
> non-single-row input, i.e., left joins if the right side is single input and 
> vice versa.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to