sunjincheng created FLINK-6650: ---------------------------------- Summary: Fix Non-windowed group-aggregate error when using append-table mode. Key: FLINK-6650 URL: https://issues.apache.org/jira/browse/FLINK-6650 Project: Flink Issue Type: Sub-task Reporter: sunjincheng Assignee: sunjincheng
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink)}}, I got the error as follows: {code} org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes. at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121) {code} The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows: {code} override def producesUpdates = true {code} I think in the view of the user, what user want are(for example): Data: {code} val data = List( (1L, 1, "Hello"), (2L, 2, "Hello"), (3L, 3, "Hello"), (4L, 4, "Hello"), (5L, 5, "Hello"), (6L, 6, "Hello"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (20L, 20, "Hello World")) {code} *Case1: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] .addSink(new StreamITCase.RetractingSink) {code} Result {code} 1 3 6 10 15 21 28 36 56 {code} * Case 2: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] .addSink(new StreamITCase.RetractingSink) {code} Result: {code} 56 {code} In fact about #Case 1,we can using unbounded OVER windows, as follows: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) .select('a.sum over 'w) .toAppendStream[Row].addSink(new StreamITCase.StringSink) {code} Result {code} Same as #Case1 {code} But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] OVER can not express the #Case1 with earlyFiring. So I still think Non-windowed group-aggregate not always update-table, user can decide which mode to use. Is there any drawback to this improvement? Welcome anyone feedback? -- This message was sent by Atlassian JIRA (v6.3.15#6346)