[ https://issues.apache.org/jira/browse/FLINK-19271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nicholas Jiang updated FLINK-19271: ----------------------------------- Description: The HOP_PROCTIME is bigger than HOP_END in the following case. The reason is we materialize the process time(HOP_PROCTIME) in the downstream of `WindowAggregate` rather than internal of `WindowAggregate`, this lead the HOP_PROCTIME is bigger than HOP_END forever. And I believe this problem exists in TUMBLE_PROCTIME and SESSION_PROCTIME too, We should use the `HOP_END - 1` as the HOP_PROCTIME when the proctime need materialization. {code:java} {code} *WindowAggregateITCase* {code:java} @Test def testEventTimeSlidingWindowProcTime(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'a, 'int, 'double, 'float, 'bigdec, 'string, 'name, 'proctime.proctime()) tEnv.registerTable("T1", table) val sql = """ |SELECT | HOP_START(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_END(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_PROCTIME(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) |FROM T1 |GROUP BY `string`, HOP(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val expected = Seq( "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.352, 2020-09-17T07:13:40.357, 2020-09-17T07:13:44.030") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} was: The HOP_PROCTIME is bigger than HOP_END in the following case. The reason is we materialize the process time(HOP_PROCTIME) in the downstream of `WindowAggregate` rather than internal of `WindowAggregate`, this lead the HOP_PROCTIME is bigger than HOP_END forever. And I believe this problem exists in TUMBLE_PROCTIME and SESSION_PROCTIME too, We should use the `HOP_END - 1` as the HOP_PROCTIME when the proctime need materialization. {code:java} # {code} *WindowAggregateITCase* {code:java} @Test def testEventTimeSlidingWindowProcTime(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'a, 'int, 'double, 'float, 'bigdec, 'string, 'name, 'proctime.proctime()) tEnv.registerTable("T1", table) val sql = """ |SELECT | HOP_START(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_END(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_PROCTIME(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) |FROM T1 |GROUP BY `string`, HOP(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val expected = Seq( "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, 2020-09-17T07:13:43.479", "2020-09-17T07:13:40.352, 2020-09-17T07:13:40.357, 2020-09-17T07:13:44.030") assertEquals(expected.sorted, sink.getAppendResults.sorted) } {code} > wrong HOP_PROCTIME output when materialize proctime > --------------------------------------------------- > > Key: FLINK-19271 > URL: https://issues.apache.org/jira/browse/FLINK-19271 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Reporter: Leonard Xu > Priority: Major > > The HOP_PROCTIME is bigger than HOP_END in the following case. > The reason is we materialize the process time(HOP_PROCTIME) in the downstream > of > `WindowAggregate` rather than internal of `WindowAggregate`, this lead the > HOP_PROCTIME is bigger than HOP_END forever. > And I believe this problem exists in TUMBLE_PROCTIME and SESSION_PROCTIME too, > We should use the `HOP_END - 1` as the HOP_PROCTIME when the proctime need > materialization. > > {code:java} > {code} > *WindowAggregateITCase* > {code:java} > @Test > def testEventTimeSlidingWindowProcTime(): Unit = { > val stream = failingDataSource(data) > .assignTimestampsAndWatermarks( > new TimestampAndWatermarkWithOffset > [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) > val table = stream.toTable(tEnv, > 'a, 'int, 'double, 'float, 'bigdec, 'string, 'name, > 'proctime.proctime()) > tEnv.registerTable("T1", table) > val sql = > """ > |SELECT > | HOP_START(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' > SECOND), > | HOP_END(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' > SECOND), > | HOP_PROCTIME(proctime, INTERVAL '0.004' SECOND, INTERVAL '0.005' > SECOND) > |FROM T1 > |GROUP BY `string`, HOP(proctime, INTERVAL '0.004' SECOND, INTERVAL > '0.005' SECOND) > """.stripMargin > val sink = new TestingAppendSink > tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) > env.execute() > val expected = Seq( > "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, > 2020-09-17T07:13:43.479", > "2020-09-17T07:13:40.348, 2020-09-17T07:13:40.353, > 2020-09-17T07:13:43.479", > "2020-09-17T07:13:40.352, 2020-09-17T07:13:40.357, > 2020-09-17T07:13:44.030") > assertEquals(expected.sorted, sink.getAppendResults.sorted) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)