[ 
https://issues.apache.org/jira/browse/FLINK-33689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuai Xu updated FLINK-33689:
-----------------------------
    Summary: JsonObjectAggFunction can't retract previous data which is invalid 
when enabling local global agg  (was: JsonObjectAggFunction can't retract 
previous data which is invalid when enable local global agg)

> JsonObjectAggFunction can't retract previous data which is invalid when 
> enabling local global agg
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33689
>                 URL: https://issues.apache.org/jira/browse/FLINK-33689
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.0
>            Reporter: Shuai Xu
>            Priority: Major
>              Labels: pull-request-available
>
> Run the test as following and enable LocalGlobal in sql/AggregateITCase . 
> {code:java}
> def testGroupJsonObjectAggWithRetract(): Unit = {
>   val data = new mutable.MutableList[(Long, String, Long)]
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   data.+=((2L, "Hallo", 2L))
>   val sql =
>     s"""
>        |select
>        |   JSON_OBJECTAGG(key k value v)
>        |from (select
>        |             cast(SUM(a) as string) as k,t as v
>        |       from
>        |             Table6
>        |       group by t)
>        |""".stripMargin
>   val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
>   tEnv.createTemporaryView("Table6", t)
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected =
>     List(
>       "{\"30\":2}"
>     )
>   assertThat(sink.getRetractResults).isEqualTo(expected)
> } {code}
> The result is as following.
> {code:java}
> List({"14":2,"30":2}) {code}
> However,  \{"14":2}  should be retracted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to