[ 
https://issues.apache.org/jira/browse/FLINK-33760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-33760:
-------------------------------------

    Assignee: Yunhong Zheng

> Group Window agg has different result when only consuming -D records while 
> using or not using minibatch
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33760
>                 URL: https://issues.apache.org/jira/browse/FLINK-33760
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: xuyang
>            Assignee: Yunhong Zheng
>            Priority: Major
>
> Add the test in AggregateITCase to re-produce this bug.
>  
> {code:java}
> @Test
> def test(): Unit = {
>   val upsertSourceCurrencyData = List(
>     changelogRow("-D", 1.bigDecimal, "a"),
>     changelogRow("-D", 1.bigDecimal, "b"),
>     changelogRow("-D", 1.bigDecimal, "b")
>   )
>   val upsertSourceDataId = registerData(upsertSourceCurrencyData);
>   tEnv.executeSql(s"""
>                      |CREATE TABLE T (
>                      | `a` DECIMAL(32, 8),
>                      | `d` STRING,
>                      | proctime as proctime()
>                      |) WITH (
>                      | 'connector' = 'values',
>                      | 'data-id' = '$upsertSourceDataId',
>                      | 'changelog-mode' = 'I,UA,UB,D',
>                      | 'failing-source' = 'true'
>                      |)
>                      |""".stripMargin)
>   val sql =
>     "SELECT max(a), sum(a), min(a), TUMBLE_START(proctime, INTERVAL '0.005' 
> SECOND), TUMBLE_END(proctime, INTERVAL '0.005' SECOND), d FROM T GROUP BY d, 
> TUMBLE(proctime, INTERVAL '0.005' SECOND)"
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
>   env.execute()
>   // Use the result precision/scale calculated for sum and don't override 
> with the one calculated
>   // for plus()/minus(), which results in loosing a decimal digit.
>   val expected = 
> List("6.41671935,65947.23071935707000000000,609.02867403703699700000")
>   assertEquals(expected, sink.getRetractResults.sorted)
> } {code}
> When MiniBatch is ON, the result is `List()`.
>  
> When MiniBatch is OFF, the result is 
> `List(null,-1.00000000,null,2023-12-06T11:29:21.895,2023-12-06T11:29:21.900,a)`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to