[ https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16018620#comment-16018620 ]
Fabian Hueske commented on FLINK-6650: -------------------------------------- Thanks for starting this discussion. I think the current behavior is correct. The result of a query should only depend on the operators of a query ({{select('a.sum)}}) and not on the method to convert the result into a stream. The choice of {{toAppendStream()}} and {{toRetractStream()}} should only affect the representation of the result but not the result itself. Moreover, the result of a query on a streaming table must be identical to a batch query on the same input data. Hence, the example in case 1 would not be correct, because it should be a single row with a single value {{56}} as in case 2 and not multiple rows. The correct query to specify the desired of case 1 is the OVER query as shown in the description. The result of an OVER query can be converted into an append stream (unless we add / enable support for late data). To summarize: The drawback of implementing this would be incorrect semantics (batch != streaming) and unclear behavior (result of a query depends on the conversion method). > Fix Non-windowed group-aggregate error when using append-table mode. > -------------------------------------------------------------------- > > Key: FLINK-6650 > URL: https://issues.apache.org/jira/browse/FLINK-6650 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, > 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new > StreamITCase.StringSink)}}, I got the error as follows: > {code} > org.apache.flink.table.api.TableException: Table is not an append-only table. > Output needs to handle update and delete changes. > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607) > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) > at > org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) > at > org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121) > {code} > The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows: > {code} > override def producesUpdates = true > {code} > I think in the view of the user, what user want are(for example): > Data: > {code} > val data = List( > (1L, 1, "Hello"), > (2L, 2, "Hello"), > (3L, 3, "Hello"), > (4L, 4, "Hello"), > (5L, 5, "Hello"), > (6L, 6, "Hello"), > (7L, 7, "Hello World"), > (8L, 8, "Hello World"), > (20L, 20, "Hello World")) > {code} > * Case1: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] > .addSink(new StreamITCase.RetractingSink) > {code} > Result > {code} > 1 > 3 > 6 > 10 > 15 > 21 > 28 > 36 > 56 > {code} > * Case 2: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] > .addSink(new StreamITCase.RetractingSink) > {code} > Result: > {code} > 56 > {code} > In fact about #Case 1,we can using unbounded OVER windows, as follows: > TableAPI > {code} > stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) > .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) > .select('a.sum over 'w) > .toAppendStream[Row].addSink(new StreamITCase.StringSink) > {code} > Result > {code} > Same as #Case1 > {code} > But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] > OVER can not express the #Case1 with earlyFiring. > So I still think that Non-windowed group-aggregate not always update-table, > user can decide which mode to use. > Is there any drawback to this improvement? Welcome anyone feedback? -- This message was sent by Atlassian JIRA (v6.3.15#6346)