[ 
https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006457#comment-16006457
 ] 

ASF GitHub Bot commented on FLINK-5256:
---------------------------------------

Github user DmytroShkvyra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3673#discussion_r115996407
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DataSetSingleRowJoinTest.scala
 ---
    @@ -187,9 +187,106 @@ class SingleRowJoinTest extends TableTestBase {
           ),
           term("where", "AND(<(a1, b1)", "=(a2, b2))"),
           term("join", "a1", "a2", "b1", "b2"),
    -      term("joinType", "NestedLoopJoin")
    +      term("joinType", "NestedLoopInnerJoin")
         )
     
         util.verifySql(query, expected)
       }
    +
    +  @Test
    +  def testSingleRowJoinLeftOuterJoin(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Long, Int)]("A", 'a1, 'a2)
    +    util.addTable[(Int, Int)]("B", 'b1, 'b2)
    +
    +    val queryLeftJoin =
    +      "SELECT a2 FROM A " +
    +        "LEFT JOIN " +
    +        "(SELECT COUNT(*) AS cnt FROM B) " +
    +        "AS x " +
    +        "ON a1 = cnt"
    +
    +    val expected =
    +      unaryNode(
    +        "DataSetCalc",
    +        unaryNode(
    +          "DataSetSingleRowJoin",
    +          batchTableNode(0),
    +          term("where", "=(a1, cnt)"),
    +          term("join", "a1", "a2", "cnt"),
    +          term("joinType", "NestedLoopLeftJoin")
    +        ),
    +        term("select", "a2")
    +      ) + "\n" +
    +        unaryNode(
    +          "DataSetAggregate",
    +          unaryNode(
    +            "DataSetUnion",
    +            unaryNode(
    +              "DataSetValues",
    +              unaryNode(
    +                "DataSetCalc",
    +                batchTableNode(1),
    +                term("select", "0 AS $f0")),
    +              tuples(List(null)), term("values", "$f0")
    +            ),
    +            term("union", "$f0")
    +          ),
    +          term("select", "COUNT(*) AS cnt")
    +        )
    +
    +    util.verifySql(queryLeftJoin, expected)
    +  }
    +
    +  @Test
    +  def testSingleRowJoinInnerJoin(): Unit = {
    +    val util = batchTestUtil()
    +    util.addTable[(Int, Int)]("A", 'a1, 'a2)
    +    val query =
    +      "SELECT a2, sum(a1) " +
    +        "FROM A " +
    +        "GROUP BY a2 " +
    +        "HAVING sum(a1) > (SELECT sum(a1) * 0.1 FROM A)"
    +
    +    val expected =
    +      unaryNode(
    +        "DataSetCalc",
    +        unaryNode(
    +          "DataSetSingleRowJoin",
    +          unaryNode(
    +            "DataSetAggregate",
    +            batchTableNode(0),
    +            term("groupBy", "a2"),
    +            term("select", "a2", "SUM(a1) AS EXPR$1")
    +          ),
    +          term("where", ">(EXPR$1, EXPR$0)"),
    +          term("join", "a2", "EXPR$1", "EXPR$0"),
    +          term("joinType", "NestedLoopInnerJoin")
    +        ),
    +        term("select", "a2", "EXPR$1")
    +      ) + "\n" +
    +        unaryNode(
    +          "DataSetCalc",
    +          unaryNode(
    +            "DataSetAggregate",
    +            unaryNode(
    +              "DataSetUnion",
    +              unaryNode(
    +                "DataSetValues",
    +                unaryNode(
    +                  "DataSetCalc",
    +                  batchTableNode(0),
    +                  term("select", "a1")
    +                ),
    +                tuples(List(null)), term("values", "a1")
    +              ),
    +              term("union", "a1")
    +            ),
    +            term("select", "SUM(a1) AS $f0")
    +          ),
    +          term("select", "*($f0, 0.1) AS EXPR$0")
    +        )
    +
    +    util.verifySql(query, expected)
    +  }
    --- End diff --
    
    Done


> Extend DataSetSingleRowJoin to support Left and Right joins
> -----------------------------------------------------------
>
>                 Key: FLINK-5256
>                 URL: https://issues.apache.org/jira/browse/FLINK-5256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary 
> inner joins where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and 
> right joins with single input. These cases can be handled if the  
> {{DataSetSingleRowJoin}} is extended to support outer joins on the 
> non-single-row input, i.e., left joins if the right side is single input and 
> vice versa.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to