[jira] [Created] (FLINK-6651) Clearing registeredStates map should be protected in SharedStateRegistry#clear

2017-05-20 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6651:
-

 Summary: Clearing registeredStates map should be protected in 
SharedStateRegistry#clear
 Key: FLINK-6651
 URL: https://issues.apache.org/jira/browse/FLINK-6651
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  public void clear() {
registeredStates.clear();
  }
{code}
In other places of SharedStateRegistry, lock is taken on registeredStates 
before operation.
We should do the same for clear() method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6650) Fix Non-windowed group-aggregate error when using append-table mode.

2017-05-20 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6650:
--

 Summary: Fix Non-windowed group-aggregate error when using 
append-table mode.
 Key: FLINK-6650
 URL: https://issues.apache.org/jira/browse/FLINK-6650
 Project: Flink
  Issue Type: Sub-task
Reporter: sunjincheng
Assignee: sunjincheng


When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
  (1L, 1, "Hello"),
  (2L, 2, "Hello"),
  (3L, 3, "Hello"),
  (4L, 4, "Hello"),
  (5L, 5, "Hello"),
  (6L, 6, "Hello"),
  (7L, 7, "Hello World"),
  (8L, 8, "Hello World"),
  (20L, 20, "Hello World"))
{code}
*Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}
* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
.window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
.select('a.sum over 'w)
.toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}
But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649]
OVER can not express the #Case1 with earlyFiring.

So I still think Non-windowed group-aggregate not always update-table, user can 
decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Backwards compatibility policy.

2017-05-20 Thread Kostas Kloudas
Hi Chesnay,

I believe that for APIs we already have a pretty clear policy with the 
annotations.
I was referring to savepoints and state related backwards compatibility.


> On May 20, 2017, at 7:20 PM, Chesnay Schepler  wrote:
> 
> I think it would be a good to clarify what kind of backwards-compatibilitiy 
> we're talking about here. As in are we talking about APIs or savepoints?
> 
> On 20.05.2017 19:09, Kostas Kloudas wrote:
>> Hi all,
>> 
>> As we are getting closer to releasing Flink-1.3, I would like to open a 
>> discussion
>> on how far back we provide backwards compatibility for.
>> 
>> The reason for opening the discussion is that i) for the users and for the
>> adoption of the project, it is good to have an explicitely stated policy 
>> that implies
>> certain guarantees, and ii) keeping code and tests for backwards 
>> compatibility with
>> Flink-1.1 does not offer much. On the contrary, I think that it leads to:
>> 
>> 1) dead or ugly code in the codebase, e.g. deprecated class fields that 
>> could go away and
>> ugly if() loops (see aligned window operators that were deprecated in 1.2 
>> and are now
>> normal windows), etc
>> 2) expensive tests (as, normally, they read from a savepoint)
>> 3) binary files in the codebase for holding the aforementioned savepoints
>> 
>> My proposal for such a policy would be to offer backwards compatibility for 
>> one previous version.
>> 
>> This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows 
>> a clear
>> "backwards compatibility" path when jumping versions (a user that goes
>> from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 
>> -> 1.3),
>> while also allowing us to clean up the codebase a bit.
>> 
>> What do you think?
>> 
>> Kostas
> 
> 



[jira] [Created] (FLINK-6649) Improve Non-window group aggregate with configurable `earlyFire`.

2017-05-20 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6649:
--

 Summary: Improve Non-window group aggregate with configurable 
`earlyFire`.
 Key: FLINK-6649
 URL: https://issues.apache.org/jira/browse/FLINK-6649
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng
Assignee: sunjincheng


Currently,  Non-windowed group aggregate is earlyFiring at count(1), that is 
every row will emit a aggregate result. But some times user want config count 
number (`early firing with count[N]`) , to reduce the downstream pressure. This 
JIRA. will enable the config of e`arlyFiring` for  Non-windowed group aggregate.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] Backwards compatibility policy.

2017-05-20 Thread Chesnay Schepler
I think it would be a good to clarify what kind of 
backwards-compatibilitiy we're talking about here. As in are we talking 
about APIs or savepoints?


On 20.05.2017 19:09, Kostas Kloudas wrote:

Hi all,

As we are getting closer to releasing Flink-1.3, I would like to open a 
discussion
on how far back we provide backwards compatibility for.

The reason for opening the discussion is that i) for the users and for the
adoption of the project, it is good to have an explicitely stated policy that 
implies
certain guarantees, and ii) keeping code and tests for backwards compatibility 
with
Flink-1.1 does not offer much. On the contrary, I think that it leads to:

1) dead or ugly code in the codebase, e.g. deprecated class fields that could 
go away and
ugly if() loops (see aligned window operators that were deprecated in 1.2 and 
are now
normal windows), etc
2) expensive tests (as, normally, they read from a savepoint)
3) binary files in the codebase for holding the aforementioned savepoints

My proposal for such a policy would be to offer backwards compatibility for one 
previous version.

This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows a 
clear
"backwards compatibility" path when jumping versions (a user that goes
from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 -> 
1.3),
while also allowing us to clean up the codebase a bit.

What do you think?

Kostas





[DISCUSS] Backwards compatibility policy.

2017-05-20 Thread Kostas Kloudas
Hi all,

As we are getting closer to releasing Flink-1.3, I would like to open a 
discussion
on how far back we provide backwards compatibility for.

The reason for opening the discussion is that i) for the users and for the 
adoption of the project, it is good to have an explicitely stated policy that 
implies
certain guarantees, and ii) keeping code and tests for backwards compatibility 
with 
Flink-1.1 does not offer much. On the contrary, I think that it leads to:

1) dead or ugly code in the codebase, e.g. deprecated class fields that could 
go away and 
ugly if() loops (see aligned window operators that were deprecated in 1.2 and 
are now 
normal windows), etc
2) expensive tests (as, normally, they read from a savepoint)
3) binary files in the codebase for holding the aforementioned savepoints

My proposal for such a policy would be to offer backwards compatibility for one 
previous version. 

This means that 1.3 will be compatible with 1.2 (not 1.1). This still allows a 
clear 
"backwards compatibility" path when jumping versions (a user that goes 
from 1.1 to 1.3 can go initially 1.1 -> 1.2, take a savepoint, and then 1.2 -> 
1.3),
while also allowing us to clean up the codebase a bit. 

What do you think?

Kostas