[ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
-------------------------------
    Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
    val ddl1 =
      """
        |CREATE TABLE src (
        |  log_ts STRING,
        |  ts TIMESTAMP(3),
        |  a INT,
        |  b DOUBLE,
        |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
        |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
        |) WITH (
        |  'connector' = 'COLLECTION',
        |  'is-bounded' = 'false'
        |)
      """.stripMargin
    val ddl2 =
      """
        |CREATE TABLE dst (
        |  ts TIMESTAMP(3),
        |  a BIGINT,
        |  b DOUBLE
        |) WITH (
        |  'connector.type' = 'filesystem',
        |  'connector.path' = '/tmp/1',
        |  'format.type' = 'csv'
        |)
      """.stripMargin
    val query =
      """
        |INSERT INTO dst
        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
        |FROM src
        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
        |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
      """.stripMargin
    tEnv.sqlUpdate(ddl1)
    tEnv.sqlUpdate(ddl2)
    tEnv.sqlUpdate(query)
    println(tEnv.explain(true))
  }

{code}







I spent lots of work digging into this bug, and found the problem may be caused 
by AggregateProjectPullUpConstantsRule which doesn't generate proper project 
items correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.

The problem is WindowPropertiesRule can not match RelNodeTree after the 
transformation of AggregateProjectPullUpConstantsRule, we also can add 
ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.













  was:
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
    val ddl1 =
      """
        |CREATE TABLE src (
        |  log_ts STRING,
        |  ts TIMESTAMP(3),
        |  a INT,
        |  b DOUBLE,
        |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
        |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
        |) WITH (
        |  'connector' = 'COLLECTION',
        |  'is-bounded' = 'false'
        |)
      """.stripMargin
    val ddl2 =
      """
        |CREATE TABLE dst (
        |  ts TIMESTAMP(3),
        |  a BIGINT,
        |  b DOUBLE
        |) WITH (
        |  'connector.type' = 'filesystem',
        |  'connector.path' = '/tmp/1',
        |  'format.type' = 'csv'
        |)
      """.stripMargin
    val query =
      """
        |INSERT INTO dst
        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
        |FROM src
        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
        |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
      """.stripMargin
    tEnv.sqlUpdate(ddl1)
    tEnv.sqlUpdate(ddl2)
    tEnv.sqlUpdate(query)
    println(tEnv.explain(true))
  }

{code}







I spent lots of work digging the bug, and found the problem may be caused by 
AggregateProjectPullUpConstantsRule which doesn't generate proper project items 
correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.














> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17553
>                 URL: https://issues.apache.org/jira/browse/FLINK-17553
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Terry Wang
>            Priority: Major
>         Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
>     val ddl1 =
>       """
>         |CREATE TABLE src (
>         |  log_ts STRING,
>         |  ts TIMESTAMP(3),
>         |  a INT,
>         |  b DOUBLE,
>         |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
>         |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
>         |) WITH (
>         |  'connector' = 'COLLECTION',
>         |  'is-bounded' = 'false'
>         |)
>       """.stripMargin
>     val ddl2 =
>       """
>         |CREATE TABLE dst (
>         |  ts TIMESTAMP(3),
>         |  a BIGINT,
>         |  b DOUBLE
>         |) WITH (
>         |  'connector.type' = 'filesystem',
>         |  'connector.path' = '/tmp/1',
>         |  'format.type' = 'csv'
>         |)
>       """.stripMargin
>     val query =
>       """
>         |INSERT INTO dst
>         |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), 
> SUM(b)
>         |FROM src
>         | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
>         |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
>       """.stripMargin
>     tEnv.sqlUpdate(ddl1)
>     tEnv.sqlUpdate(ddl2)
>     tEnv.sqlUpdate(query)
>     println(tEnv.explain(true))
>   }
> {code}
> I spent lots of work digging into this bug, and found the problem may be 
> caused by AggregateProjectPullUpConstantsRule which doesn't generate proper 
> project items correctly.
> After I remove AggregateProjectPullUpConstantsRule from 
> FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
> The problem is WindowPropertiesRule can not match RelNodeTree after the 
> transformation of AggregateProjectPullUpConstantsRule, we also can add 
> ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
> AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to