[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Terry Wang updated FLINK-17553: ------------------------------- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging into this bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. The problem is WindowPropertiesRule can not match RelNodeTree after the transformation of AggregateProjectPullUpConstantsRule, we also can add ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. was: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging the bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > --------------------------------------------------------------------------------------------------------- > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Terry Wang > Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'filesystem', > | 'connector.path' = '/tmp/1', > | 'format.type' = 'csv' > |) > """.stripMargin > val query = > """ > |INSERT INTO dst > |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), > SUM(b) > |FROM src > | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) > |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) > """.stripMargin > tEnv.sqlUpdate(ddl1) > tEnv.sqlUpdate(ddl2) > tEnv.sqlUpdate(query) > println(tEnv.explain(true)) > } > {code} > I spent lots of work digging into this bug, and found the problem may be > caused by AggregateProjectPullUpConstantsRule which doesn't generate proper > project items correctly. > After I remove AggregateProjectPullUpConstantsRule from > FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > The problem is WindowPropertiesRule can not match RelNodeTree after the > transformation of AggregateProjectPullUpConstantsRule, we also can add > ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after > AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)