xuyang created FLINK-34039: ------------------------------ Summary: The session group window agg operator does not split the session window when processing retrace records. Key: FLINK-34039 URL: https://issues.apache.org/jira/browse/FLINK-34039 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.18.0 Reporter: xuyang
Add the test in GroupWindowITCase to reproduce this bug. {code:java} @TestTemplate def test(): Unit = { env.setParallelism(1) val upsertSourceDataId = registerData( List( changelogRow("+I", "a", "no1", localDateTime(1L)), changelogRow("+I", "a", "no1", localDateTime(2L)), changelogRow("+I", "a", "no1", localDateTime(6L)), changelogRow("+I", "a", "no1", localDateTime(9L)), changelogRow("-D", "a", "no1", localDateTime(6L)) )) tEnv.executeSql(s""" |CREATE TABLE upsert_currency ( | pk STRING, | str STRING, | currency_time TIMESTAMP(3), | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND |) WITH ( | 'connector' = 'values', | 'changelog-mode' = 'I,UB,UA,D', | 'data-id' = '$upsertSourceDataId' |) |""".stripMargin) val sql = """ |SELECT |pk, |COUNT(*) AS cnt, |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start, |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end |FROM upsert_currency |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND) |""".stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toDataStream.addSink(sink) env.execute() println(sink.getAppendResults.sorted) }{code} The result is: {code:java} a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code} But if we change the source data as below: {code:java} val upsertSourceDataId = registerData( List( changelogRow("+I", "a", "no1", localDateTime(1L)), changelogRow("+I", "a", "no1", localDateTime(2L)), // changelogRow("+I", "a", "no1", localDateTime(6L)), changelogRow("+I", "a", "no1", localDateTime(9L)) // changelogRow("-D", "a", "no1", localDateTime(6L)) )) {code} The result will be: {code:java} a,1,1970-01-01T00:00:09,1970-01-01T00:00:14 a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code} When there is a minibatch operator upstream and CDC messages may be folded, the results of the session group window agg node may be different. -- This message was sent by Atlassian Jira (v8.20.10#820010)