[ https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019334#comment-16019334 ]
ASF GitHub Bot commented on FLINK-6650: --------------------------------------- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3958 [FLINK-6650][table] Improve the error message for toAppendStream The PR have three small changes: 1. Improve the error message for toAppendStream. 2. Change incorrect variable name. 3.Add JAVA DOC for key parameter of method. - [x] General - The pull request references the related JIRA issue ("[FLINK-6650][table] Improve the error message for toAppendStream") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6650-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3958.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3958 ---- ---- > Fix Non-windowed group-aggregate error when using append-table mode. > -------------------------------------------------------------------- > > Key: FLINK-6650 > URL: https://issues.apache.org/jira/browse/FLINK-6650 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, > 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new > StreamITCase.StringSink)}}, I got the error as follows: > {code} > org.apache.flink.table.api.TableException: Table is not an append-only table. > Output needs to handle update and delete changes. > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607) > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) > at > org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121) > {code} > The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows: > {code} > override def producesUpdates = true > {code} > I think in the view of the user, what user want are(for example): > Data: > {code} > val data = List( > (1L, 1, "Hello"), > (2L, 2, "Hello"), > (3L, 3, "Hello"), > (4L, 4, "Hello"), > (5L, 5, "Hello"), > (6L, 6, "Hello"), > (7L, 7, "Hello World"), > (8L, 8, "Hello World"), > (20L, 20, "Hello World")) > {code} > * Case1: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum) > .toAppendStream[Row].addSink(new StreamITCase.StringSink) > {code} > Result > {code} > // StringSink process datas: > 1 > 3 > 6 > 10 > 15 > 21 > 28 > 36 > 56 > // Last output datas: > 1 > 3 > 6 > 10 > 15 > 21 > 28 > 36 > 56 > {code} > * Case 2: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] > .addSink(new StreamITCase.RetractingSink) > {code} > Result: > {code} > // RetractingSink process datas: > (true,1) > (false,1) > (true,3) > (false,3) > (true,6) > (false,6) > (true,10) > (false,10) > (true,15) > (false,15) > (true,21) > (false,21) > (true,28) > (false,28) > (true,36) > (false,36) > (true,56) > // Last output data: > 56 > {code} > In fact about #Case 1,we can using unbounded OVER windows, as follows: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) > .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) > .select('a.sum over 'w) > .toAppendStream[Row].addSink(new StreamITCase.StringSink) > {code} > Result > {code} > Same as #Case1 > {code} > But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] > OVER can not express the #Case1 with earlyFiring. > So I still think that Non-windowed group-aggregate not always update-table, > user can decide which mode to use. > Is there any drawback to this improvement? Welcome anyone feedback? -- This message was sent by Atlassian JIRA (v6.3.15#6346)