xuyang created FLINK-34039:
------------------------------

             Summary: The session group window agg operator does not split the 
session window when processing retrace records.
                 Key: FLINK-34039
                 URL: https://issues.apache.org/jira/browse/FLINK-34039
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.18.0
            Reporter: xuyang


Add the test in GroupWindowITCase to reproduce this bug.

 
{code:java}
@TestTemplate
def test(): Unit = {
  env.setParallelism(1)
  val upsertSourceDataId = registerData(
    List(
      changelogRow("+I", "a", "no1", localDateTime(1L)),
      changelogRow("+I", "a", "no1", localDateTime(2L)),
      changelogRow("+I", "a", "no1", localDateTime(6L)),
      changelogRow("+I", "a", "no1", localDateTime(9L)),
      changelogRow("-D", "a", "no1", localDateTime(6L))
    ))
  tEnv.executeSql(s"""
                     |CREATE TABLE upsert_currency (
                     |  pk STRING,
                     |  str STRING,
                     |  currency_time TIMESTAMP(3),
                     |  WATERMARK FOR currency_time AS currency_time - interval 
'5' SECOND
                     |) WITH (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'I,UB,UA,D',
                     |  'data-id' = '$upsertSourceDataId'
                     |)
                     |""".stripMargin)
  val sql =
    """
      |SELECT
      |pk,
      |COUNT(*) AS cnt,
      |SESSION_START(currency_time, INTERVAL '5' SECOND) as w_start,
      |SESSION_END(currency_time, INTERVAL '5' SECOND) as w_end
      |FROM upsert_currency
      |GROUP BY pk, SESSION(currency_time, INTERVAL '5' SECOND)
      |""".stripMargin
  val sink = new TestingAppendSink
  tEnv.sqlQuery(sql).toDataStream.addSink(sink)
  env.execute()
  println(sink.getAppendResults.sorted)
}{code}
The result is: 

 

 
{code:java}
a,3,1970-01-01T00:00:01,1970-01-01T00:00:14{code}
But if we change the source data as below:
{code:java}
val upsertSourceDataId = registerData(
      List(
        changelogRow("+I", "a", "no1", localDateTime(1L)),
        changelogRow("+I", "a", "no1", localDateTime(2L)),             
        // changelogRow("+I", "a", "no1", localDateTime(6L)),
        changelogRow("+I", "a", "no1", localDateTime(9L))
        // changelogRow("-D", "a", "no1", localDateTime(6L))
      )) {code}
 

The result will be:
{code:java}
a,1,1970-01-01T00:00:09,1970-01-01T00:00:14
a,2,1970-01-01T00:00:01,1970-01-01T00:00:07{code}
When there is a minibatch operator upstream and CDC messages may be folded, the 
results of the session group window agg node may be different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to