Shuai Xu created FLINK-33689:
--------------------------------
Summary: jsonObjectAggFunction can't retract previous data which
is invalid when enable local global agg
Key: FLINK-33689
URL: https://issues.apache.org/jira/browse/FLINK-33689
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.18.0
Reporter: Shuai Xu
Run the test as following and enable LocalGlobal and minibatch in
sql/AggregateITCase .
{code:java}
//代码占位符
def testGroupJsonObjectAggWithRetract(): Unit = {
val data = new mutable.MutableList[(Long, String, Long)]
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
data.+=((2L, "Hallo", 2L))
val sql =
s"""
|select
| JSON_OBJECTAGG(key k value v)
|from (select
| cast(SUM(a) as string) as k,t as v
| from
| Table6
| group by t)
|""".stripMargin
val t = failingDataSource(data).toTable(tEnv, 'a, 'c, 't)
tEnv.createTemporaryView("Table6", t)
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected =
List(
"{\"30\":2}"
)
assertThat(sink.getRetractResults).isEqualTo(expected)
} {code}
The result is as following.
{code:java}
//代码占位符
List({"14":2,"30":2}) {code}
However, \{"14":2} should be retracted.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)