[ 
https://issues.apache.org/jira/browse/FLINK-22099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy updated FLINK-22099:
-------------------------
    Description: 
Fix bug for Semi/Anti WindowJoin.
{code:java}
//代码占位符
@Test
def testSemiJoinIN(): Unit = {
  val sql =
    """
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) L WHERE L.a IN (
      |SELECT a FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) R
      |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
    """.stripMargin
  util.verifyRelPlan(sql)
}

@Test
def testSemiExist(): Unit = {
  val sql =
    """
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) L WHERE EXISTS (
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) R
      |WHERE L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a)
    """.stripMargin
  util.verifyRelPlan(sql)
}

@Test
def testAntiJoinNotExist(): Unit = {
  val sql =
    """
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) L WHERE NOT EXISTS (
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) R
      |WHERE L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a)
    """.stripMargin
  util.verifyRelPlan(sql)
}

@Test
def testAntiJoinNotIN(): Unit = {
  val sql =
    """
      |SELECT * FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) L WHERE L.a NOT IN (
      |SELECT a FROM (
      |  SELECT
      |    a,
      |    window_start,
      |    window_end,
      |    window_time,
      |    count(*) as cnt,
      |    count(distinct c) AS uv
      |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
      |  GROUP BY a, window_start, window_end, window_time
      |) R
      |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
    """.stripMargin
  util.verifyRelPlan(sql)
}{code}
Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown out.

> Fix bug for semi/anti window join.
> ----------------------------------
>
>                 Key: FLINK-22099
>                 URL: https://issues.apache.org/jira/browse/FLINK-22099
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: Andy
>            Assignee: Andy
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.13.0
>
>
> Fix bug for Semi/Anti WindowJoin.
> {code:java}
> //代码占位符
> @Test
> def testSemiJoinIN(): Unit = {
>   val sql =
>     """
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) L WHERE L.a IN (
>       |SELECT a FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) R
>       |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
>     """.stripMargin
>   util.verifyRelPlan(sql)
> }
> @Test
> def testSemiExist(): Unit = {
>   val sql =
>     """
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) L WHERE EXISTS (
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) R
>       |WHERE L.window_start = R.window_start AND L.window_end = R.window_end 
> AND L.a = R.a)
>     """.stripMargin
>   util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotExist(): Unit = {
>   val sql =
>     """
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) L WHERE NOT EXISTS (
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) R
>       |WHERE L.window_start = R.window_start AND L.window_end = R.window_end 
> AND L.a = R.a)
>     """.stripMargin
>   util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotIN(): Unit = {
>   val sql =
>     """
>       |SELECT * FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) L WHERE L.a NOT IN (
>       |SELECT a FROM (
>       |  SELECT
>       |    a,
>       |    window_start,
>       |    window_end,
>       |    window_time,
>       |    count(*) as cnt,
>       |    count(distinct c) AS uv
>       |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' 
> MINUTE))
>       |  GROUP BY a, window_start, window_end, window_time
>       |) R
>       |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
>     """.stripMargin
>   util.verifyRelPlan(sql)
> }{code}
> Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown 
> out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to