xuyang created FLINK-33760:
------------------------------
Summary: Group Window agg has different result when only consuming
-D records while using or not using minibatch
Key: FLINK-33760
URL: https://issues.apache.org/jira/browse/FLINK-33760
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Reporter: xuyang
Add the test in AggregateITCase to re-produce this bug.
{code:java}
@Test
def test(): Unit = {
val upsertSourceCurrencyData = List(
changelogRow("-D", 1.bigDecimal, "a"),
changelogRow("-D", 1.bigDecimal, "b"),
changelogRow("-D", 1.bigDecimal, "b")
)
val upsertSourceDataId = registerData(upsertSourceCurrencyData);
tEnv.executeSql(s"""
|CREATE TABLE T (
| `a` DECIMAL(32, 8),
| `d` STRING,
| proctime as proctime()
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$upsertSourceDataId',
| 'changelog-mode' = 'I,UA,UB,D',
| 'failing-source' = 'true'
|)
|""".stripMargin)
val sql =
"SELECT max(a), sum(a), min(a), TUMBLE_START(proctime, INTERVAL '0.005'
SECOND), TUMBLE_END(proctime, INTERVAL '0.005' SECOND), d FROM T GROUP BY d,
TUMBLE(proctime, INTERVAL '0.005' SECOND)"
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()
// Use the result precision/scale calculated for sum and don't override with
the one calculated
// for plus()/minus(), which results in loosing a decimal digit.
val expected =
List("6.41671935,65947.23071935707000000000,609.02867403703699700000")
assertEquals(expected, sink.getRetractResults.sorted)
} {code}
When MiniBatch is ON, the result is `List()`.
When MiniBatch is OFF, the result is
`List(null,-1.00000000,null,2023-12-06T11:29:21.895,2023-12-06T11:29:21.900,a)`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)