[ 
https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy updated FLINK-20909:
-------------------------
    Description: 
MiniBatch Interval derivation does not work well when enable miniBatch 
optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
  val t = env.fromCollection(rowtimeTestData)
    .assignTimestampsAndWatermarks(new RowtimeExtractor)
    .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
  tEnv.registerTable("T", t)
  tEnv.executeSql(
    s"""
       |CREATE TABLE rowtime_sink (
       |    cnt BIGINT
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'changelog-mode' = 'I,UA,D'
       |)
       |""".stripMargin)
  val sql =
    """
      |INSERT INTO rowtime_sink
      |SELECT COUNT(b) FROM (
      | SELECT a, b, c, rowtime
      | FROM (
      |   SELECT *,
      |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
      |   FROM T
      | )
      | WHERE rowNum = 1
      | )
    """.stripMargin

  tEnv.executeSql(sql).await()
  val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan 
is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
   +- Exchange(distribution=[single])
      +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, 
COUNT_RETRACT(*) AS count1$1])
         +- Calc(select=[b])
            +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
               +- Exchange(distribution=[hash[b]])
                  +- Calc(select=[b, rowtime])
                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                        +- DataStreamScan(table=[[default_catalog, 
default_database, T]], fields=[a, b, c, rowtime]){code}
A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird because 
`Deduplicate` depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` 
will send watermark every specified interval second depends on process time. 
For `Deduplicate`, the incoming watermark does not relate to rowTime of 
incoming record, it cannot indicate rowTime of all following input records are 
all larger than or equals to the current incoming watermark.

 

  was:
MiniBatch Interval derivation does not work well when enable miniBatch 
optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
  val t = env.fromCollection(rowtimeTestData)
    .assignTimestampsAndWatermarks(new RowtimeExtractor)
    .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
  tEnv.registerTable("T", t)
  tEnv.executeSql(
    s"""
       |CREATE TABLE rowtime_sink (
       |    cnt BIGINT
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'changelog-mode' = 'I,UA,D'
       |)
       |""".stripMargin)
  val sql =
    """
      |INSERT INTO rowtime_sink
      |SELECT COUNT(b) FROM (
      | SELECT a, b, c, rowtime
      | FROM (
      |   SELECT *,
      |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
      |   FROM T
      | )
      | WHERE rowNum = 1
      | )
    """.stripMargin

  tEnv.executeSql(sql).await()
  val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan 
is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
   +- Exchange(distribution=[single])
      +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, 
COUNT_RETRACT(*) AS count1$1])
         +- Calc(select=[b])
            +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
               +- Exchange(distribution=[hash[b]])
                  +- Calc(select=[b, rowtime])
                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                        +- DataStreamScan(table=[[default_catalog, 
default_database, T]], fields=[a, b, c, rowtime]){code}
A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird because 
`Deduplicate` depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` 
will send watermark every each 5 second depends on process time. For 
`Deduplicate`, the incoming watermark does not relate to rowTime of incoming 
record, it cannot indicate rowTime of all following input records are all 
larger than or equals to the current incoming watermark.

 


> MiniBatch Interval derivation does not work well when enable miniBatch 
> optimization in a job which contains deduplicate on row and unbounded 
> aggregate.
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20909
>                 URL: https://issues.apache.org/jira/browse/FLINK-20909
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Andy
>            Priority: Minor
>
> MiniBatch Interval derivation does not work well when enable miniBatch 
> optimization in a job which contains deduplicate on row and unbounded 
> aggregate.
> {code:java}
> @Test
> def testLastRowOnRowtime1(): Unit = {
>   val t = env.fromCollection(rowtimeTestData)
>     .assignTimestampsAndWatermarks(new RowtimeExtractor)
>     .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
>   tEnv.registerTable("T", t)
>   tEnv.executeSql(
>     s"""
>        |CREATE TABLE rowtime_sink (
>        |    cnt BIGINT
>        |) WITH (
>        |  'connector' = 'values',
>        |  'sink-insert-only' = 'false',
>        |  'changelog-mode' = 'I,UA,D'
>        |)
>        |""".stripMargin)
>   val sql =
>     """
>       |INSERT INTO rowtime_sink
>       |SELECT COUNT(b) FROM (
>       | SELECT a, b, c, rowtime
>       | FROM (
>       |   SELECT *,
>       |     ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
>       |   FROM T
>       | )
>       | WHERE rowNum = 1
>       | )
>     """.stripMargin
>   tEnv.executeSql(sql).await()
>   val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
> }{code}
> E.g for the above sql, when enable MiniBatch optimization, the optimized plan 
> is as following.
> {code:java}
> Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
> +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
>    +- Exchange(distribution=[single])
>       +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, 
> COUNT_RETRACT(*) AS count1$1])
>          +- Calc(select=[b])
>             +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
>                +- Exchange(distribution=[hash[b]])
>                   +- Calc(select=[b, rowtime])
>                      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                         +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c, rowtime]){code}
> A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird 
> because `Deduplicate` depends on rowTime, however 
> `ProcTimeMiniBatchAssignerOperator` will send watermark every specified 
> interval second depends on process time. For `Deduplicate`, the incoming 
> watermark does not relate to rowTime of incoming record, it cannot indicate 
> rowTime of all following input records are all larger than or equals to the 
> current incoming watermark.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to