[jira] [Created] (FLINK-6651) Clearing registeredStates map should be protected in SharedStateRegistry#clear
Ted Yu created FLINK-6651: - Summary: Clearing registeredStates map should be protected in SharedStateRegistry#clear Key: FLINK-6651 URL: https://issues.apache.org/jira/browse/FLINK-6651 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} public void clear() { registeredStates.clear(); } {code} In other places of SharedStateRegistry, lock is taken on registeredStates before operation. We should do the same for clear() method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.
sunjincheng created FLINK-6650: -- Summary: Fix Non-windowed group-aggregate error when using append-table mode. Key: FLINK-6650 URL: https://issues.apache.org/jira/browse/FLINK-6650 Project: Flink Issue Type: Sub-task Reporter: sunjincheng Assignee: sunjincheng When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink)}}, I got the error as follows: {code} org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes. at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121) {code} The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows: {code} override def producesUpdates = true {code} I think in the view of the user, what user want are(for example): Data: {code} val data = List( (1L, 1, "Hello"), (2L, 2, "Hello"), (3L, 3, "Hello"), (4L, 4, "Hello"), (5L, 5, "Hello"), (6L, 6, "Hello"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (20L, 20, "Hello World")) {code} *Case1: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] .addSink(new StreamITCase.RetractingSink) {code} Result {code} 1 3 6 10 15 21 28 36 56 {code} * Case 2: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] .addSink(new StreamITCase.RetractingSink) {code} Result: {code} 56 {code} In fact about #Case 1,we can using unbounded OVER windows, as follows: TableAPI {code} stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) .select('a.sum over 'w) .toAppendStream[Row].addSink(new StreamITCase.StringSink) {code} Result {code} Same as #Case1 {code} But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] OVER can not express the #Case1 with earlyFiring. So I still think Non-windowed group-aggregate not always update-table, user can decide which mode to use. Is there any drawback to this improvement? Welcome anyone feedback? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] Backwards compatibility policy.
Hi Chesnay, I believe that for APIs we already have a pretty clear policy with the annotations. I was referring to savepoints and state related backwards compatibility. > On May 20, 2017, at 7:20 PM, Chesnay Schepler wrote: > > I think it would be a good to clarify what kind of backwards-compatibilitiy > we're talking about here. As in are we talking about APIs or savepoints? > > On 20.05.2017 19:09, Kostas Kloudas wrote: >> Hi all, >> >> As we are getting closer to releasing Flink-1.3, I would like to open a >> discussion >> on how far back we provide backwards compatibility for. >> >> The reason for opening the discussion is that i) for the users and for the >> adoption of the project, it is good to have an explicitely stated policy >> that implies >> certain guarantees, and ii) keeping code and tests for backwards >> compatibility with >> Flink-1.1 does not offer much. On the contrary, I think that it leads to: >> >> 1) dead or ugly code in the codebase, e.g. deprecated class fields that >> could go away and >> ugly if() loops (see aligned window operators that were deprecated in 1.2 >> and are now >> normal windows), etc >> 2) expensive tests (as, normally, they read from a savepoint) >> 3) binary files in the codebase for holding the aforementioned savepoints >> >> My proposal for such a policy would be to offer backwards compatibility for >> one previous version. >> >> This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows >> a clear >> "backwards compatibility" path when jumping versions (a user that goes >> from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 >> -> 1.3), >> while also allowing us to clean up the codebase a bit. >> >> What do you think? >> >> Kostas > >
[jira] [Created] (FLINK-6649) Improve Non-window group aggregate with configurable `earlyFire`.
sunjincheng created FLINK-6649: -- Summary: Improve Non-window group aggregate with configurable `earlyFire`. Key: FLINK-6649 URL: https://issues.apache.org/jira/browse/FLINK-6649 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.4.0 Reporter: sunjincheng Assignee: sunjincheng Currently, Non-windowed group aggregate is earlyFiring at count(1), that is every row will emit a aggregate result. But some times user want config count number (`early firing with count[N]`) , to reduce the downstream pressure. This JIRA. will enable the config of e`arlyFiring` for Non-windowed group aggregate. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Re: [DISCUSS] Backwards compatibility policy.
I think it would be a good to clarify what kind of backwards-compatibilitiy we're talking about here. As in are we talking about APIs or savepoints? On 20.05.2017 19:09, Kostas Kloudas wrote: Hi all, As we are getting closer to releasing Flink-1.3, I would like to open a discussion on how far back we provide backwards compatibility for. The reason for opening the discussion is that i) for the users and for the adoption of the project, it is good to have an explicitely stated policy that implies certain guarantees, and ii) keeping code and tests for backwards compatibility with Flink-1.1 does not offer much. On the contrary, I think that it leads to: 1) dead or ugly code in the codebase, e.g. deprecated class fields that could go away and ugly if() loops (see aligned window operators that were deprecated in 1.2 and are now normal windows), etc 2) expensive tests (as, normally, they read from a savepoint) 3) binary files in the codebase for holding the aforementioned savepoints My proposal for such a policy would be to offer backwards compatibility for one previous version. This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows a clear "backwards compatibility" path when jumping versions (a user that goes from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 -> 1.3), while also allowing us to clean up the codebase a bit. What do you think? Kostas
[DISCUSS] Backwards compatibility policy.
Hi all, As we are getting closer to releasing Flink-1.3, I would like to open a discussion on how far back we provide backwards compatibility for. The reason for opening the discussion is that i) for the users and for the adoption of the project, it is good to have an explicitely stated policy that implies certain guarantees, and ii) keeping code and tests for backwards compatibility with Flink-1.1 does not offer much. On the contrary, I think that it leads to: 1) dead or ugly code in the codebase, e.g. deprecated class fields that could go away and ugly if() loops (see aligned window operators that were deprecated in 1.2 and are now normal windows), etc 2) expensive tests (as, normally, they read from a savepoint) 3) binary files in the codebase for holding the aforementioned savepoints My proposal for such a policy would be to offer backwards compatibility for one previous version. This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows a clear "backwards compatibility" path when jumping versions (a user that goes from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 -> 1.3), while also allowing us to clean up the codebase a bit. What do you think? Kostas