[ https://issues.apache.org/jira/browse/FLINK-22099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andy updated FLINK-22099: ------------------------- Description: Fix bug for Semi/Anti WindowJoin. {code:java} //代码占位符 @Test def testSemiJoinIN(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE L.a IN ( |SELECT a FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) """.stripMargin util.verifyRelPlan(sql) } @Test def testSemiExist(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE EXISTS ( |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a) """.stripMargin util.verifyRelPlan(sql) } @Test def testAntiJoinNotExist(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE NOT EXISTS ( |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a) """.stripMargin util.verifyRelPlan(sql) } @Test def testAntiJoinNotIN(): Unit = { val sql = """ |SELECT * FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) L WHERE L.a NOT IN ( |SELECT a FROM ( | SELECT | a, | window_start, | window_end, | window_time, | count(*) as cnt, | count(distinct c) AS uv | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) | GROUP BY a, window_start, window_end, window_time |) R |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) """.stripMargin util.verifyRelPlan(sql) }{code} Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown out. > Fix bug for semi/anti window join. > ---------------------------------- > > Key: FLINK-22099 > URL: https://issues.apache.org/jira/browse/FLINK-22099 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: Andy > Assignee: Andy > Priority: Minor > Labels: pull-request-available > Fix For: 1.13.0 > > > Fix bug for Semi/Anti WindowJoin. > {code:java} > //代码占位符 > @Test > def testSemiJoinIN(): Unit = { > val sql = > """ > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) L WHERE L.a IN ( > |SELECT a FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) R > |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) > """.stripMargin > util.verifyRelPlan(sql) > } > @Test > def testSemiExist(): Unit = { > val sql = > """ > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) L WHERE EXISTS ( > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) R > |WHERE L.window_start = R.window_start AND L.window_end = R.window_end > AND L.a = R.a) > """.stripMargin > util.verifyRelPlan(sql) > } > @Test > def testAntiJoinNotExist(): Unit = { > val sql = > """ > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) L WHERE NOT EXISTS ( > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) R > |WHERE L.window_start = R.window_start AND L.window_end = R.window_end > AND L.a = R.a) > """.stripMargin > util.verifyRelPlan(sql) > } > @Test > def testAntiJoinNotIN(): Unit = { > val sql = > """ > |SELECT * FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) L WHERE L.a NOT IN ( > |SELECT a FROM ( > | SELECT > | a, > | window_start, > | window_end, > | window_time, > | count(*) as cnt, > | count(distinct c) AS uv > | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' > MINUTE)) > | GROUP BY a, window_start, window_end, window_time > |) R > |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) > """.stripMargin > util.verifyRelPlan(sql) > }{code} > Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown > out. -- This message was sent by Atlassian Jira (v8.3.4#803005)