xuyang created FLINK-33489:
------------------------------

             Summary: LISTAGG with generating partial-final agg will case wrong 
result
                 Key: FLINK-33489
                 URL: https://issues.apache.org/jira/browse/FLINK-33489
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0, 1.13.0, 1.12.0, 
1.11.0, 1.10.0, 1.9.0
            Reporter: xuyang


Adding the following test cases in SplitAggregateITCase will reproduce this bug:

 
{code:java}
// code placeholder
@Test
def testListAggWithDistinctMultiArgs(): Unit = {
  val t1 = tEnv.sqlQuery(s"""
                            |SELECT
                            |  a,
                            |  LISTAGG(DISTINCT c, '#')
                            |FROM T
                            |GROUP BY a
     """.stripMargin)

  val sink = new TestingRetractSink
  t1.toRetractStream[Row].addSink(sink)
  env.execute()

  val expected = Map[String, List[String]](
    "1" -> List("Hello 0", "Hello 1"),
    "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
    "3" -> List("Hello 0", "Hello 1"),
    "4" -> List("Hello 1", "Hello 2", "Hello 3")
  )
  val actualData = sink.getRetractResults.sorted
  println(actualData)
} {code}
The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
`#` will be ignored.

Let's take its plan:
{code:java}
// code placeholder
LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
+- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT($f3_0) AS $f1])
   +- Exchange(distribution=[hash[a]])
      +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
         +- Exchange(distribution=[hash[a, $f3, $f4]])
            +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 1024) 
AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                  +- DataStreamScan(table=[[default_catalog, default_database, 
T]], fields=[a, b, c]) {code}
The final `GroupAggregate` missing the delimiter args, and the default 
delimiter `,` will be used.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to