[ 
https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20909.
---------------------------
    Resolution: Fixed

Fixed in master: d1d78e72220e2938bf5af77420af16c477c96e29

> MiniBatch Interval derivation does not work well when enable miniBatch 
> optimization in a job which contains deduplicate on row and unbounded 
> aggregate.
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20909
>                 URL: https://issues.apache.org/jira/browse/FLINK-20909
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Andy
>            Assignee: Andy
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0
>
>
> MiniBatch Interval derivation does not work well when enable miniBatch 
> optimization in a job which contains deduplicate on row and unbounded 
> aggregate.
> {code:java}
> @Test
> def testLastRowOnRowtime1(): Unit = {
>   val t = env.fromCollection(rowtimeTestData)
>     .assignTimestampsAndWatermarks(new RowtimeExtractor)
>     .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
>   tEnv.registerTable("T", t)
>   tEnv.executeSql(
>     s"""
>        |CREATE TABLE rowtime_sink (
>        |    cnt BIGINT
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'changelog-mode' = 'I,UA,D'
>        |)
>        |""".stripMargin)
>   val sql =
>     """
>       |INSERT INTO rowtime_sink
>       |SELECT COUNT(b) FROM (
>       | SELECT a, b, c, rowtime
>       | FROM (
>       |   SELECT *,
>       |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
>       |   FROM T
>       | )
>       | WHERE rowNum = 1
>       | )
>     """.stripMargin
>   tEnv.executeSql(sql).await()
>   val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
> }{code}
> E.g for the above sql, when enable MiniBatch optimization, the optimized plan 
> is as following.
> {code:java}
> Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
> +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
>    +- Exchange(distribution=[single])
>       +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, 
> COUNT_RETRACT(*) AS count1$1])
>          +- Calc(select=[b])
>             +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
>                +- Exchange(distribution=[hash[b]])
>                   +- Calc(select=[b, rowtime])
>                      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                         +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c, rowtime]){code}
> A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird 
> because `Deduplicate` depends on rowTime, however 
> `ProcTimeMiniBatchAssignerOperator` will send watermark every specified 
> interval second depends on process time. For `Deduplicate`, the incoming 
> watermark does not relate to rowTime of incoming record, it cannot indicate 
> rowTime of all following input records are all larger than or equals to the 
> current incoming watermark.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to