Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-07-05 Thread sihua zhou


Hi Stephan,


Thank you very much for the reply and very happy for that!


I'm not sure whether I understood your idea correctly. Does it means 1) we 
should add a new operator with the feature of Elastic Bloom Filter? or 2) we 
could support it as the current (version <= 1.5) InternalTimerServer as an 
independent managed keyed state?


- If that means to introduce a new operator for supporting EBF, I think I not 
sure whether it's easy to integerate the some of the current features of SQL 
with EBF since their were built on other operaters. E.g. the "stream join" and 
the "distinct".


- If that means to maintain the EBF as the current TimeServer(which's not 
integrated into state backend), this is the implementation which we are using 
on production currently.


Please let me know if I totally misunderstood...


Best, Sihua
On 07/5/2018 04:34,Stephan Ewen wrote:
Hi Sihua!


Sorry for joining this discussion late.


I can see the benefit of such a feature and also see the technical merit. It is 
a nice piece of work and a good proposal.


I am wondering if there is a way to add such a technique as a "library 
operator", or whether it needs a deep integration into the runtime and the 
state backends.


The state backends have currently a few efforts going on, like state TTL, 
making timers part of the state backends, asychronous timer snapshots, scalable 
timers in RocksDB, avoiding small file fragmentation (and too much read/write 
amplification) for RocksDB incremental snapshots, faster state recovery 
efforts, etc.
This is a lot, and all these features are on the list for quite a while, with 
various users pushing for them.


If we can add such BloomFilters as an independent operator, quasi like a 
library or utility, then this is much easier to integrate, because it needs no 
coordination with the other State Backend work. It is also easier to review and 
merge, because it would be a new independent feature and not immediately affect 
all existing state functionality. If this interacts deeply with existing state 
backends, it touches some of the most critical and most active parts of the 
system, which needs a lot of time from the core developers of these parts, 
making it harder and take much longer.


What do you think about looking at whether the elastic bloom filters be added 
like a library operator?


Best,
Stephan




On Tue, Jun 12, 2018 at 4:35 PM, sihua zhou  wrote:
Hi,


Maybe I would like to add more information concerning to the Linked Filter 
Nodes on each key group. The reason that we need to maintance a Linked Filter 
Nodes is that we need to handle data skew, data skew is also the most 
challenging problem that we need to overcome. Because we don't know how many 
records will fall into each key group, so we couldn't allocate a Final Filter 
Node at the begin time, so we need to allocate the Filter Node lazily, each 
time we only allocate a Small Filter Node
for the incoming records, once it filled we freeze it and allocate a new node 
for the future incoming records, so we get a Linked Filter Node on each key 
group and only the head Node is writable, the rest are immutable.


Best, Sihua

On 06/12/2018 16:22,sihua zhou wrote:
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a 
TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other 
nodes are all full, they are only immutable(only readable, we implement the 
relaxed ttl based on this feature). Even though we don't  need to remove the 
node, we still always need to insert the data into the current node(the head 
node), because the node is allocated lazily(to handle data skew), each node's a 
can only store "a part" of the data, once the current node is full, we allocate 
a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate 
in theroy. But there are some reasons that I prefer to implement this based on 
BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, 
maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, 
but since the cuckoo filter is a dense data structure, we can't store the time 
stamp with the record in cuckoo filter, we may need to depend on the "extra 
thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest 
storage unit" in the "Elastic Filter", after we provide a implementation base 
on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and 
provide the version that based on cuckoo filter as a second iteration? What do 
you think?


Best, Sihua

[jira] [Created] (FLINK-9661) TTL state should support to do time shift after restoring from checkpoint( savepoint).

2018-06-25 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9661:
-

 Summary: TTL state should support to do time shift after restoring 
from checkpoint( savepoint).
 Key: FLINK-9661
 URL: https://issues.apache.org/jira/browse/FLINK-9661
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The initial version of the TTL-state appends the expired timestamp along the 
state record, and check the expired timestamp with the condition 
{{expired_timestamp <= current_time}} when accessing the state, if it is true 
then the record is expired, otherwise it is still alive. This could works 
pretty fine in the most cases, but in some case, we need to do time shift, 
otherwise it may cause some unexpected result when using the ProccessTime, I 
roughly describe two case as follow.

- when restoring the job from the savepoint

For example, the user set the TTL to 2h for the state, if he trigger a 
savepoint and restore the job from the savepoint after 2h(maybe some reason 
that delay he to restore the job quickly), then the restored job's previous 
state data are all expired.

- when the job spend a long time to recover from a failure

For example, there are many jobs running on a yarn session cluster, and the 
cluster configured to use the DFS to store the checkpoint data, but 
unfortunately, the DFS meet a strange problem which makes the jobs on the 
cluster begin to loop in recovery-fail-recovery-fail... the devs spend some 
time to address the issue of DFS and the jobs start working properly, but if 
the "{{system down time >= TTL}}" then the job's previous state data will be 
expired in this case.

To avoid the problems as above, we need to do time shift after the job 
recovering from checkpoint & savepoint. A possible approach is outlined in 
[6186|https://github.com/apache/flink/pull/6186].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Multiple kafka consumers

2018-06-25 Thread sihua zhou
Hi Amol,


I think If you set the parallelism of the source node equal to the number of 
the partition of the kafka topic, you could have per kafka customer per 
partition in your job. But if the number of the partitions of the kafka is 
dynamic, the 1:1 relationship might break. I think maybe @Gordon(CC) could give 
you more useful information.


Best, Sihua






On 06/25/2018 17:19,Amol S - iProgrammer wrote:
Same kind of question I have asked on stack overflow also.

Please answer it ASAP

https://stackoverflow.com/questions/51020018/partition-specific-flink-kafka-consumer

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Mon, Jun 25, 2018 at 2:09 PM, Amol S - iProgrammer 




Re: [ANNOUNCE] New committer: Sihua Zhou

2018-06-22 Thread sihua zhou


Thank you everyone! 
This is definitely an honour for me! It is very nice to work with you and I got 
a lot from you guys!


Thanks, 
Sihua






On 06/22/2018 22:31,zhenya Sun wrote:
Congratulations


from my iphone!
On 06/22/2018 21:17, Till Rohrmann wrote:
Hi everybody,

On behalf of the PMC I am delighted to announce Sihua Zhou as a new Flink
committer!

Sihua has been an active member of our community for several months. Among
other things, he helped developing Flip-6, improved Flink's state backends
and fixed a lot of major and minor issues. Moreover, he is helping the
Flink community reviewing PRs, answering users on the mailing list and
proposing new features.

Please join me in congratulating Sihua for becoming a Flink committer!

Cheers,
Till


[jira] [Created] (FLINK-9633) Flink doesn't use the Savepoint path's filesystem to create the OuptutStream on Task.

2018-06-21 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9633:
-

 Summary: Flink doesn't use the Savepoint path's filesystem to 
create the OuptutStream on Task.
 Key: FLINK-9633
 URL: https://issues.apache.org/jira/browse/FLINK-9633
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


Currently, flink use the Savepoint's filesystem to create the meta output 
stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses 
the Checkpoint's filesystem to create the checkpoint data output stream. When 
the Savepoint & Checkpoint in different filesystem this will lead to 
problematic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Ordering of stream from different kafka partitions

2018-06-20 Thread sihua zhou




Hi,


I think a global ordering is a bit impractical on production, but in theroy, 
you still can do that. You need to


- Firstly fix the operate's parallelism to 1(except the source node).
- If you want to sort the records within a bouned time, then you can keyBy() a 
constant and window it, buffer the records in the state and sort the records 
when the window is triggered, the code maybe as follows.
{code}
sourceStream
.setParallelism(4)
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
{
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
 })
.keyBy((record) -> 0)// keyby the constant value
.window(...)
.process(new OrderTheRecords()))
.setParallelism(1);
{code}


- If you want to sort the records truly globally(non-window), then you could 
keyBy a constant, store the records in the state and sort the records in the 
process() function for every incoming record. And if you want a perfect correct 
output, then maybe you need to do retraction (because every incoming records 
may change the globally order), the code maybe as follows
{code}
sourceStream
.setParallelism(4)
.keyBy((record) -> 0) // keyby the constant value
.process(new OrderTheRecords()))
.setParallelism(1);
{code}




In all the case, you need to fix the parallelism of the OrderTheRecord operate 
to 1, which makes your job non-scale-able and becomes the bottleneck. So a 
global ordering maybe not practical on production (but if the source's TPS is 
very low, then maybe practical).


Best, Sihua


On 06/20/2018 15:36,Amol S - iProgrammer wrote:
Hello Andrey,

Thanks for your quick response. I have tried with your above code but it
didn't suit's my requirement. I need global ordering of my records by using
multiple kafka partitions. Please suggest me any workaround for this. as
mentioned in this
<https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams>
link is it possible to buffer data for some amount of time and then perform
sort on this or any other way out there?

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin 
wrote:

Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.
BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
{
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
});

In general, if records are sorted by anything in a Kafka partition,
parallel subtask of Flink Kafka source will consume these records and push
to user operators in the same order. There is maximum one consuming subtask
per Kafka partition but several partitions might be served by one subtask.
It means that there are the same guarantees as in Kafka: ordering per
partition but not across them, including no global ordering.

The case of global and per window ordering is already described by Sihua.
The global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or
there is no ordering at all, you can try the above approach with
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
across these partitions per key or all records. It is similar to ordering
within a window. It means there could still be late records coming after
out of orderness period of time which can break the ordering. This operator
buffers records in state to maintain the order but only for out of
orderness period of time which also increases latency.

Cheers,
Andrey

On 19 Jun 2018, at 14:12, sihua zhou  wrote:



Hi Amol,


I'm not sure whether this is impossible, especially when you need to
operate the record in multi parallelism.


IMO, in theroy, we can only get a ordered stream when there is a single
partition of kafka and operate it with a single parallelism in flink. Even
in this case, if you only want to order the records in a window, than you
need to store the

[jira] [Created] (FLINK-9622) DistributedCacheDfsTest failed on travis

2018-06-20 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9622:
-

 Summary: DistributedCacheDfsTest failed on travis
 Key: FLINK-9622
 URL: https://issues.apache.org/jira/browse/FLINK-9622
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


DistributedCacheDfsTest#testDistributeFileViaDFS() failed flakey on travis.

instance: https://api.travis-ci.org/v3/job/394399700/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9619:
-

 Summary: Always close the task manager connection when the 
container is completed in YarnResourceManager
 Key: FLINK-9619
 URL: https://issues.apache.org/jira/browse/FLINK-9619
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


We should always eagerly close the connection with task manager when the 
container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re:Ordering of stream from different kafka partitions

2018-06-19 Thread sihua zhou


Hi Amol,


I'm not sure whether this is impossible, especially when you need to operate 
the record in multi parallelism. 


IMO, in theroy, we can only get a ordered stream when there is a single 
partition of kafka and operate it with a single parallelism in flink. Even in 
this case, if you only want to order the records in a window, than you need to 
store the records in the state, and order them when the window is triggered. 
But if you want to order the records with a single `keyBy()`(non-window), I 
think that's maybe impossible in practice, because you need to store the all 
the incoming records and order the all data for every incoming records, also 
you need to send retracted message for the previous result(because every 
incoming record might change the global order of the records).


Best, Sihua
On 06/19/2018 19:19,Amol S - iProgrammer wrote:
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 



[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9613:
-

 Summary: YARNSessionCapacitySchedulerITCase failed because 
YarnTestBase.checkClusterEmpty()
 Key: FLINK-9613
 URL: https://issues.apache.org/jira/browse/FLINK-9613
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The test YARNSessionCapacitySchedulerITCase failed on travis because of 
.YarnTestBase.checkClusterEmpty().

https://api.travis-ci.org/v3/job/394017104/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9601) Snapshot of CopyOnWriteStateTable will failed, when the amount of record is more than MAXIMUM_CAPACITY

2018-06-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9601:
-

 Summary: Snapshot of CopyOnWriteStateTable will failed, when the 
amount of record is more than MAXIMUM_CAPACITY
 Key: FLINK-9601
 URL: https://issues.apache.org/jira/browse/FLINK-9601
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


In short, the problem is that we reuse the `snaphotData` as the output array 
when partitioning the input data, but the `snapshotData` is max length is `1 << 
30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` 
(e.g. 1 <<30 + 1), then the check 
`Preconditions.checkState(partitioningDestination.length >= numberOfElements);` 
could be failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-06-12 Thread sihua zhou
Hi,


Maybe I would like to add more information concerning to the Linked Filter 
Nodes on each key group. The reason that we need to maintance a Linked Filter 
Nodes is that we need to handle data skew, data skew is also the most 
challenging problem that we need to overcome. Because we don't know how many 
records will fall into each key group, so we couldn't allocate a Final Filter 
Node at the begin time, so we need to allocate the Filter Node lazily, each 
time we only allocate a Small Filter Node
for the incoming records, once it filled we freeze it and allocate a new node 
for the future incoming records, so we get a Linked Filter Node on each key 
group and only the head Node is writable, the rest are immutable.


Best, Sihua
On 06/12/2018 16:22,sihua zhou wrote:
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a 
TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other 
nodes are all full, they are only immutable(only readable, we implement the 
relaxed ttl based on this feature). Even though we don't  need to remove the 
node, we still always need to insert the data into the current node(the head 
node), because the node is allocated lazily(to handle data skew), each node's a 
can only store "a part" of the data, once the current node is full, we allocate 
a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate 
in theroy. But there are some reasons that I prefer to implement this based on 
BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, 
maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, 
but since the cuckoo filter is a dense data structure, we can't store the time 
stamp with the record in cuckoo filter, we may need to depend on the "extra 
thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest 
storage unit" in the "Elastic Filter", after we provide a implementation base 
on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and 
provide the version that based on cuckoo filter as a second iteration? What do 
you think?


Best, Sihua
On 06/12/2018 15:43,Fabian Hueske wrote:
Hi Sihua,


Sorry for not replying earlier.

I have one question left. If I understood the design of the linked Bloomfilter 
nodes right, users would need to configure a TTL to be able to remove a node.

When nodes are removed, we would need to insert every key into the current node 
which would not be required if we don't remove nodes, right?


From the small summary of approximated filters, cuckoo filters seem to be most 
appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides potentially 
slower inserts)?


Best, Fabian







2018-06-12 9:29 GMT+02:00 sihua zhou :

Hi,


no more feedbacks these days...I guess it's because you guys are too busy and 
since I didn't receive any negative feedbacks and there're already some 
positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on 
the current design doc(because I have time to do it currently), even though I 
think the design can be improved definitely, but maybe we could discuss the 
improvement better base on the code, and I believe most of the code could be 
cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these 
days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on 
production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data 
structures and support fast query(even faster then BF, but the insert maybe a 
little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need 
to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether 
it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom 
Filter", we can define a general interface, and provide different 
implementation of "storage unit"  base on different filter if we want. Maybe I 
should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", 
th

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-06-12 Thread sihua zhou
Hi Fabian,


Thanks a lot for your reply, you are right that users would need to configure a 
TTL for the Elastic Filter to recycle the memory resource.


For every Linked BloomFilter Nodes, only the head node is writable, the other 
nodes are all full, they are only immutable(only readable, we implement the 
relaxed ttl based on this feature). Even though we don't  need to remove the 
node, we still always need to insert the data into the current node(the head 
node), because the node is allocated lazily(to handle data skew), each node's a 
can only store "a part" of the data, once the current node is full, we allocate 
a new head node.


Concerning to the cuckoo filters, I also think it seem to be most appropriate 
in theroy. But there are some reasons that I prefer to implement this based on 
BF as the first interation.


- I didn't find a open source lib that provide the a "stable" cuckoo filter, 
maybe we need to implement it ourself, it's not a trivial work.


- The most attraction that cuckoo filter provided is that it support deletion, 
but since the cuckoo filter is a dense data structure, we can't store the time 
stamp with the record in cuckoo filter, we may need to depend on the "extra 
thing"(e.g. timer) to use it's deletion, the performance overhead may not cheap.


- No matter it's cuckoo filter or bloom fiter, they both seems as the "smallest 
storage unit" in the "Elastic Filter", after we provide a implementation base 
on Bloom Filter, it easily to extend to cuckoo filter.


How about to provide the Elastic Filter based on BF as the first iteration and 
provide the version that based on cuckoo filter as a second iteration? What do 
you think?


Best, Sihua
On 06/12/2018 15:43,Fabian Hueske wrote:
Hi Sihua,


Sorry for not replying earlier.

I have one question left. If I understood the design of the linked Bloomfilter 
nodes right, users would need to configure a TTL to be able to remove a node.

When nodes are removed, we would need to insert every key into the current node 
which would not be required if we don't remove nodes, right?


From the small summary of approximated filters, cuckoo filters seem to be most 
appropriate as they also support deletes.
Are you aware of any downsides compared to bloom filters (besides potentially 
slower inserts)?


Best, Fabian







2018-06-12 9:29 GMT+02:00 sihua zhou :

Hi,


no more feedbacks these days...I guess it's because you guys are too busy and 
since I didn't receive any negative feedbacks and there're already some 
positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on 
the current design doc(because I have time to do it currently), even though I 
think the design can be improved definitely, but maybe we could discuss the 
improvement better base on the code, and I believe most of the code could be 
cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these 
days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on 
production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data 
structures and support fast query(even faster then BF, but the insert maybe a 
little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need 
to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether 
it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom 
Filter", we can define a general interface, and provide different 
implementation of "storage unit"  base on different filter if we want. Maybe I 
should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", 
the ideal of approach that I outlined in the doc is very similar to the paper 
"Optimization and Applications of Dynamic Bloom 
Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare 
to the paper, the approach I outlined could have a better query performance and 
also support the RELAXED TTL), maybe it can help to understand the desgin doc. 
Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and 
"Golumb Compressed Sequence", my first sensation is that maybe "Golumc 
Compressed Sequence" is not a good choose, because it seems to require 
non-constant lookup time, but Cuckoo Filter maybe a good cho

Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-06-12 Thread sihua zhou
Hi,


no more feedbacks these days...I guess it's because you guys are too busy and 
since I didn't receive any negative feedbacks and there're already some 
positive feedbacks. So I want to implement this *Elastic Bloom Filter* based on 
the current design doc(because I have time to do it currently), even though I 
think the design can be improved definitely, but maybe we could discuss the 
improvement better base on the code, and I believe most of the code could be 
cherry picked for the "final implementation". Does anyone object this?


Best, Sihua




On 06/6/2018 22:02,sihua zhou wrote:
Hi,


Sorry, but pinging for more feedbacks on this proposal...
Even the negative feedbacks is highly appreciated!


Best, Sihua






On 05/30/2018 13:19,sihua zhou wrote:
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these 
days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on 
production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data 
structures and support fast query(even faster then BF, but the insert maybe a 
little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need 
to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether 
it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom 
Filter", we can define a general interface, and provide different 
implementation of "storage unit"  base on different filter if we want. Maybe I 
should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", 
the ideal of approach that I outlined in the doc is very similar to the paper 
"Optimization and Applications of Dynamic Bloom 
Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare 
to the paper, the approach I outlined could have a better query performance and 
also support the RELAXED TTL), maybe it can help to understand the desgin doc. 
Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and 
"Golumb Compressed Sequence", my first sensation is that maybe "Golumc 
Compressed Sequence" is not a good choose, because it seems to require 
non-constant lookup time, but Cuckoo Filter maybe a good choose, I should 
definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom 
filter(which is the smallest unit to store data in the current desgin), the 
main challenge for introducing BF into flink is the data skewed(which is common 
phenomenon on production) problem, could you maybe also have a look at the 
solution that I posted on the google doc 
https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
 for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske :
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?




Re: [DISCUSS] Flink 1.6 features

2018-06-09 Thread sihua zhou
Hi Stephan,


Thanks very much for your response! That gave me the confidence to continue to 
work on the Elastic Filter. But even though we have implemented it(based on 
1.3.2) and used it on production for a several months, If there's one commiter 
is willing to guide me(since it's not a very trivial work, and IMO our current 
implementation base on 1.3.2 is a bit hacker, a design reviews would be really 
helpful) to bring it into flink, I will be very grateful.


Best, Sihua




On 06/9/2018 04:31,Stephan Ewen wrote:
Hi all!


Thanks for the discussion and good input. Many suggestions fit well with the 
proposal above.


Please bear in mind that with a time-based release model, we would release 
whatever is mature by end of July.
The good thing is we could schedule the next release not too far after that, so 
that the features that did not quite make it will not be delayed too long.
In some sense, you could read this as as "what to do first" list, rather than 
"this goes in, other things stay out".


Some thoughts on some of the suggestions


Kubernetes integration: An opaque integration with Kubernetes should be 
supported through the "as a library" mode. For a deeper integration, I know 
that some committers have experimented with some PoC code. I would let Till add 
some thoughts, he has worked the most on the deployment parts recently.



Per partition watermarks with idleness: Good point, could one implement that on 
the current interface, with a periodic watermark extractor?


Atomic cancel-with-savepoint: Agreed, this is important. Making this work with 
all sources needs a bit more work. We should have this in the roadmap.


Elastic Bloomfilters: This seems like an interesting new feature - the above 
suggested feature set was more about addressing some longer standing 
issues/requests. However, nothing should prevent contributors to work on that.


Best,
Stephan




On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science]  
wrote:


+1 on https://issues.apache.org/jira/browse/FLINK-5479

|
[FLINK-5479] Per-partition watermarks in ...
issues.apache.org
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
 It's normally not a common case to have Kafka partitions not producing any 
data, but it'll probably be good to handle this as well. I ...
|


From: Rico Bergmann 
Sent: Tuesday, June 5, 2018 9:12:00 PM
To: Hao Sun
Cc:dev@flink.apache.org; user
Subject: Re: [DISCUSS] Flink 1.6 features
 
+1 on K8s integration 





Am 06.06.2018 um 00:01 schrieb Hao Sun :


adding my vote to K8S Job mode, maybe it is this?
> Smoothen the integration in Container environment, like "Flink as a Library", 
> and easier integration with Kubernetes services and other proxies.





On Mon, Jun 4, 2018 at 11:01 PM Ben Yan  wrote:

Hi Stephan,

Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]  (Per-partition 
watermarks in FlinkKafkaConsumer should consider idle partitions) be included 
in 1.6? As we are seeing more users with this issue on the mailing lists.



Thanks.
Ben


2018-06-05 5:29 GMT+08:00 Che Lui Shum :


Hi Stephan,

Will FLINK-7129 (Support dynamically changing CEP patterns) be included in 1.6? 
There were discussions about possibly including it in 1.6:
http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e

Thanks,
Shirley Shum

Stephan Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of 
Apache Flink 1.5 has happened (yay!) - so it is a good time

From: Stephan Ewen 
To: dev@flink.apache.org, user 
Date: 06/04/2018 02:21 AM
Subject: [DISCUSS] Flink 1.6 features





Hi Flink Community!

The release of Apache Flink 1.5 has happened (yay!) - so it is a good time to 
start talking about what to do for release 1.6.

== Suggested release timeline ==

I would propose to release around end of July (that is 8-9 weeks from now).

The rational behind that: There was a lot of effort in release testing 
automation (end-to-end tests, scripted stress tests) as part of release 1.5. 
You may have noticed the big set of new modules under "flink-end-to-end-tests" 
in the Flink repository. It delayed the 1.5 release a bit, and needs to 
continue as part of the coming release cycle, but should help make releasing 
more lightweight from now on.

(Side note: There are also some nightly stress tests that we created and run at 
data Artisans, and where we are looking whether and in which way it would make 
sense to contribute them to Flink.)

== Features and focus areas ==

We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new 
network stack, recovery, SQL joins and client, ... Following something like a 
"tick-tock-model", I would suggest to focus the next release more on 
integrations, tooling, and reducing user friction. 

Of course, this does not mean that no other pull request gets 

[jira] [Created] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0

2018-06-07 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9546:
-

 Summary: The heartbeatTimeoutIntervalMs of HeartbeatMonitor should 
be larger than 0
 Key: FLINK-9546
 URL: https://issues.apache.org/jira/browse/FLINK-9546
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Sihua Zhou
Assignee: Sihua Zhou


The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
currently the arg check looks like
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}

it should be
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re:[DISCUSS] Flink 1.6 features

2018-06-04 Thread sihua zhou
Hi Stephan,


could you please also consider the "Elastic Filter " feature discussioned in 
thread 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Introduce-Elastic-Bloom-Filter-For-Flink-td22430.html
 ?


Best, Sihua




On 06/4/2018 17:21,Stephan Ewen wrote:
Hi Flink Community!

The release of Apache Flink 1.5 has happened (yay!) - so it is a good time
to start talking about what to do for release 1.6.

*== Suggested release timeline ==*

I would propose to release around *end of July* (that is 8-9 weeks from
now).

The rational behind that: There was a lot of effort in release testing
automation (end-to-end tests, scripted stress tests) as part of release
1.5. You may have noticed the big set of new modules under
"flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
release a bit, and needs to continue as part of the coming release cycle,
but should help make releasing more lightweight from now on.

(Side note: There are also some nightly stress tests that we created and
run at data Artisans, and where we are looking whether and in which way it
would make sense to contribute them to Flink.)

*== Features and focus areas ==*

We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
network stack, recovery, SQL joins and client, ... Following something like
a "tick-tock-model", I would suggest to focus the next release more on
integrations, tooling, and reducing user friction.

Of course, this does not mean that no other pull request gets reviewed, an
no other topic will be examined - it is simply meant as a help to
understand where to expect more activity during the next release cycle.
Note that these are really the coarse focus areas - don't read this as a
comprehensive list.

This list is my first suggestion, based on discussions with committers,
users, and mailing list questions.

- Support Java 9 and Scala 2.12

- Smoothen the integration in Container environment, like "Flink as a
Library", and easier integration with Kubernetes services and other proxies.

- Polish the remaing parts of the FLIP-6 rewrite

- Improve state backends with asynchronous timer snapshots, efficient
timer deletes, state TTL, and broadcast state support in RocksDB.

- Extends Streaming Sinks:
- Bucketing Sink should support S3 properly (compensate for eventual
consistency), work with Flink's shaded S3 file systems, and efficiently
support formats that compress/index arcoss individual rows (Parquet, ORC,
...)
- Support ElasticSearch's new REST API

- Smoothen State Evolution to support type conversion on snapshot restore

- Enhance Stream SQL and CEP
- Add support for "update by key" Table Sources
- Add more table sources and sinks (Kafka, Kinesis, Files, K/V stores)
- Expand SQL client
- Integrate CEP and SQL, through MATCH_RECOGNIZE clause
- Improve CEP Performance of SharedBuffer on RocksDB


Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-06-04 Thread sihua zhou
Hi andrey,


Thanks for this doc! TBH, personally I prefer the approach you outlined in the 
doc over the previous one that purly based on timers. I think this approach 
looks very similar to the approach I outlined in this thread before, so it 
still face the challenges that @Bowen outlined, but I think maybe we can try to 
overcome them. Will have a closer look at the doc you post and leave some 
comments if I can.


Best, Sihua






On 06/4/2018 15:27,Andrey Zagrebin wrote:
Hi everybody,

We have been recently brainstorming ideas around state TTL in Flink
and compiled our thoughts in the following design doc:
https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM 
<https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
Thanks to the community, many things in the doc are based on the previous 
discussions.

As there are a lot of TTL requirements related to data privacy regulations 
(quite hot topic in EU)
and better cleanup strategies sometimes need more research and maybe POCs,
we suggest to start with implementing TTL API itself
and rather without major changes in current state performance.

In a nut shell, the approach requires only appending expiration timestamp bytes 
to each state value/entry.
Firstly, just block access to expired state and clean it up on explicit 
touching,
then gradually adopt cleanup strategies with different guarantees to address 
space concerns better,
including:
- filter out expired state during checkpointing process
- exact cleanup with timer service (though still requires double storing of 
keys in both backends)
- piggy-back rocksdb compaction using custom filter by TTL (similar to 
cassandra custom filter)
- cleanup of heap regions around randomly accessed bucket

Please, feel free to give any feedback and comments.

Thanks,
Andrey

On 27 May 2018, at 09:46, sihua zhou  wrote:

Hi Bowen,


Thanks for your clarification, I agree that we should wait for the timer on 
RocksDB to be finished, after that we could even do some micro-benchmark before 
start implementing.


Best, Sihua






On 05/27/2018 15:07,Bowen Li wrote:
Thank you Fabian and Sihua. I explained more in the doc itself and its
comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
how users implement TTL themselves today, 2) we should develop a generic
architecture for TTL for all (currently two) state backends (impl can
vary), 3) optimizations and improvements can come at v2 or later version.

For Sihua proposal, similar to the old plan we came up, I share similar
concerns as before and wonder if you have answers:

- it requires building custom compaction for both state backends, it's a
bit unclear in:
- when and who and how? The 'when' might be the hardest one, because
it really depends on user's use cases. E.g. if it's once a day, at what
time in a day?
- how well it will integrate with Flink's checkpoint/savepoint
mechanism?
- any performance regression indications in either state backends?
- how much is the ROI if it requires very complicated implementation?
- I'm afraid, eventually, the optimization will easily go to a tricky
direction we may want to avoid - shall we come up with extra design to
amortize the cost?
- I'm afraid the custom compaction logic will have to make some quite
different assumptions of different state backends. E.g. It's harder to
estimate total memory required for user's app in Heap statebackend then,
because it depends on when you trigger the compaction and how strictly you
will stick to the schedule everyday. Any undeterministic behavior may lead
to users allocating less memory than enough, and eventually causes user's
apps to be unstable
- I want to highlight that lots of users actually want the exact TTL
feature. How users implement TTL with timers today actually implies that
their logic depends on exact TTL for both shrinking their state size and
expiring a key at exactly an expected time, I chatted with a few different
Flink users recently and confirmed it. That's why I want to add exact TTL
as a potential goal and motivation if possible, along with relaxed TTL and
avoiding indefinitely growing state. If we don't provide that out of box,
many users may still use the timer way themselves

To the concern of doubling keys - in Heap state backend, the key is only a
reference so there's only one copy, that's not a problem; in rocksdb state
backend, yes, the state size will bigger. Well, First, I believe this's a
tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
SSD) is relatively cheap and accessible, and we don't normally take it as a
big constraint. Second, w.r.t. to performance, I believe rocksdb timers
will sit in a different column family than others, which may not cause
noticeable perf issue. The rocksdb timer service is on is way, and I want
to see how it's implemented first before asserting if there're truly any
potential perf burden. Finally, there're also improveme

[jira] [Created] (FLINK-9480) Let local recovery support rescaling

2018-05-30 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9480:
-

 Summary: Let local recovery support rescaling
 Key: FLINK-9480
 URL: https://issues.apache.org/jira/browse/FLINK-9480
 Project: Flink
  Issue Type: Improvement
Reporter: Sihua Zhou


Currently, local recovery only support restore from checkpoint and without 
rescaling. Maybe we should enable it to support rescaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9479) Let the rescale API to use local recovery

2018-05-30 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9479:
-

 Summary: Let the rescale API to use local recovery
 Key: FLINK-9479
 URL: https://issues.apache.org/jira/browse/FLINK-9479
 Project: Flink
  Issue Type: Improvement
  Components: REST, State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou


Currently, flink's online rescale api operates as the follow:
- trigger savepoint for the job
- rescaling the job from the savepoint

We should improve it to use the local recovery to speed up it and reduce the 
network pressure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9475) introduce an approximate version of "select distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9475:
-

 Summary: introduce an approximate version of "select distinct"
 Key: FLINK-9475
 URL: https://issues.apache.org/jira/browse/FLINK-9475
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Base on the "Elastic Bloom Filter", it easy to implement an approximate version 
of "select distinct" that have an excellent performance. Its accuracy should be 
configurable, e.g. 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-05-29 Thread sihua zhou
Hi,


I did a survey of the variants of Bloom Filter and the Cuckoo filter these 
days. Finally, I found 3 of them maybe adaptable for our purpose.


1. standard bloom filter (which we have implemented base on this and used it on 
production with a good experience)
2. cuckoo filter, also a very good filter which is a space-efficient data 
structures and support fast query(even faster then BF, but the insert maybe a 
little slower than BF), addtional it support delete() operation.
3. count bloom filter, a variant of BF, it supports delete()operation, but need 
to cost 4-5x memory than the standard bloom filter(so, I'm not sure whether 
it's adaptable in practice).


Anyway, these filters are just the smallest storage unit in this "Elastic Bloom 
Filter", we can define a general interface, and provide different 
implementation of "storage unit"  base on different filter if we want. Maybe I 
should change the PROPOSAL name to the "Introduce Elastic Filter For Flink", 
the ideal of approach that I outlined in the doc is very similar to the paper 
"Optimization and Applications of Dynamic Bloom 
Filters(http://ijarcs.info/index.php/Ijarcs/article/viewFile/826/814)"(compare 
to the paper, the approach I outlined could have a better query performance and 
also support the RELAXED TTL), maybe it can help to understand the desgin doc. 
Looking forward any feedback!


Best, Sihua
On 05/24/2018 10:36,sihua zhou wrote:
Hi,
Thanks for your suggestions @Elias! I have a brief look at "Cuckoo Filter" and 
"Golumb Compressed Sequence", my first sensation is that maybe "Golumc 
Compressed Sequence" is not a good choose, because it seems to require 
non-constant lookup time, but Cuckoo Filter maybe a good choose, I should 
definitely have a deeper look at it.


Beside, to me, all of this filters seems to a "variant" of the bloom 
filter(which is the smallest unit to store data in the current desgin), the 
main challenge for introducing BF into flink is the data skewed(which is common 
phenomenon on production) problem, could you maybe also have a look at the 
solution that I posted on the google doc 
https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
 for this problem, It would be nice if you could give us some advice on that.


Best, Sihua


On 05/24/2018 07:21,Elias Levy wrote:
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
<http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf> by
F. Putze, P. Sanders, and J. Singler.  See section 4.



We should discuss which exact implementation of bloom filters are the best
fit.
@Fabian: There are also implementations of bloom filters that use counting
and therefore support
deletes, but obviously this comes at the cost of a potentially higher
space consumption.

Am 23.05.2018 um 11:29 schrieb Fabian Hueske :
IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter
is that they are insert-only data structures, i.e., it is not possible to
remove keys once
they were added. This might render the filter useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom
Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over
time?




[jira] [Created] (FLINK-9474) Introduce an approximate version of "count distinct"

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9474:
-

 Summary: Introduce an approximate version of "count distinct"
 Key: FLINK-9474
 URL: https://issues.apache.org/jira/browse/FLINK-9474
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We can implement an approximate version of "count distinct" base on the 
"Elastic Bloom Filter", It could be very fast because we don't need to query 
the state anymore, its accuracy should could be configurable. e.g 95%, 98%.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9468) get outputLimit of LimitedConnectionsFileSystem incorrectly

2018-05-29 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9468:
-

 Summary: get outputLimit of LimitedConnectionsFileSystem 
incorrectly
 Key: FLINK-9468
 URL: https://issues.apache.org/jira/browse/FLINK-9468
 Project: Flink
  Issue Type: Bug
  Components: FileSystem
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


In {{LimitedConnectionsFileSystem#createStream}}, we get the outputLimit 
incorrectly.
{code:java}
private  T createStream(
final SupplierWithException streamOpener,
final HashSet openStreams,
final boolean output) throws IOException {

final int outputLimit = output && maxNumOpenInputStreams > 0 ? 
maxNumOpenOutputStreams : Integer.MAX_VALUE;
/**/
}
{code}

should be 
{code:java}
private  T createStream(
final SupplierWithException streamOpener,
final HashSet openStreams,
final boolean output) throws IOException {

final int outputLimit = output && maxNumOpenOutputStreams > 0 ? 
maxNumOpenOutputStreams : Integer.MAX_VALUE;
/**/
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [PROPOSAL] Improving Flink’s timer management for large state

2018-05-27 Thread sihua zhou


I also +1 for this very good proposal! 
In general, the design is good, especially the part the related to the timer on 
Heap, but refer to the part of the timer on RocksDB, I think there may still 
exist some improvement that we can do, I just left the comments on the doc.


Best, Sihua




On 05/27/2018 15:31,Bowen Li wrote:
+1 LGTM. RocksDB timer service is one of the most highly anticipated
features from Flink users, and it's finally coming, officially. I also
would love to see bringing timer more closely to state backend, for the
sake of easier development and maintenance of code.

On Fri, May 25, 2018 at 7:13 AM, Stefan Richter 

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-05-27 Thread sihua zhou
Hi Bowen,


Thanks for your clarification, I agree that we should wait for the timer on 
RocksDB to be finished, after that we could even do some micro-benchmark before 
start implementing.


Best, Sihua






On 05/27/2018 15:07,Bowen Li<bowenl...@gmail.com> wrote:
Thank you Fabian and Sihua. I explained more in the doc itself and its
comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
how users implement TTL themselves today, 2) we should develop a generic
architecture for TTL for all (currently two) state backends (impl can
vary), 3) optimizations and improvements can come at v2 or later version.

For Sihua proposal, similar to the old plan we came up, I share similar
concerns as before and wonder if you have answers:

- it requires building custom compaction for both state backends, it's a
bit unclear in:
- when and who and how? The 'when' might be the hardest one, because
it really depends on user's use cases. E.g. if it's once a day, at what
time in a day?
- how well it will integrate with Flink's checkpoint/savepoint
mechanism?
- any performance regression indications in either state backends?
- how much is the ROI if it requires very complicated implementation?
- I'm afraid, eventually, the optimization will easily go to a tricky
direction we may want to avoid - shall we come up with extra design to
amortize the cost?
- I'm afraid the custom compaction logic will have to make some quite
different assumptions of different state backends. E.g. It's harder to
estimate total memory required for user's app in Heap statebackend then,
because it depends on when you trigger the compaction and how strictly you
will stick to the schedule everyday. Any undeterministic behavior may lead
to users allocating less memory than enough, and eventually causes user's
apps to be unstable
- I want to highlight that lots of users actually want the exact TTL
feature. How users implement TTL with timers today actually implies that
their logic depends on exact TTL for both shrinking their state size and
expiring a key at exactly an expected time, I chatted with a few different
Flink users recently and confirmed it. That's why I want to add exact TTL
as a potential goal and motivation if possible, along with relaxed TTL and
avoiding indefinitely growing state. If we don't provide that out of box,
many users may still use the timer way themselves

To the concern of doubling keys - in Heap state backend, the key is only a
reference so there's only one copy, that's not a problem; in rocksdb state
backend, yes, the state size will bigger. Well, First, I believe this's a
tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
SSD) is relatively cheap and accessible, and we don't normally take it as a
big constraint. Second, w.r.t. to performance, I believe rocksdb timers
will sit in a different column family than others, which may not cause
noticeable perf issue. The rocksdb timer service is on is way, and I want
to see how it's implemented first before asserting if there're truly any
potential perf burden. Finally, there're also improvements we can make
after v1, including relaxed TTL and smaller timer state size. E.g. Flink
can approximate timers within a user configured time range (say within 5
sec) into a single timer. I don't have concretely plan for that yet, but
it's doable.

Stefan is adding rocksdb timer and bringing timer service more closely to
keyed backends, which aligned very well with what we want in this FLIP. I
suggest we wait and keep a close eye on those efforts, and as they mature,
we'll have a much better idea of the whole picture.

Thanks, Bowen



On Sat, May 26, 2018 at 7:52 AM, sihua zhou <summerle...@163.com> wrote:



Hi,


thanks for your reply Fabian, about the overhead of storing the key bytes
twice, I think maybe the situation is even a bit worse, in general, it
means that the total amount of data to be stored has doubled(for each key,
we need to store two records, one for timer, one for state). This maybe a
bit uncomfortable when the state backend is based on RocksDB, because the
timers are living together with the other states in the same RocksDB
instance, which means that with using TTL, the amount of the records in
RocksDB has to be doubled, I'm afraid this may hurt its performance.


Concerning the approach to add a timestamp to each value, TBH, I didn't
have a deep thought on it yet and also not sure about it...In general, it
can be described as follows:


- We attach a TS for every state record.
- When getting the record, we check the TS to see if its outdated.
- For the records that we will never touch again, we use the compaction to
remove them. maybe one day one compaction is enough.


Best, Sihua


On 05/16/2018 16:38,Fabian Hueske<fhue...@gmail.com> wrote:
Hi,


Yes. IMO it makes sense to put the logic into the abstract base classes to
share the implementation across different state backends and state
primitives.

The o

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-05-26 Thread sihua zhou


Hi,


thanks for your reply Fabian, about the overhead of storing the key bytes 
twice, I think maybe the situation is even a bit worse, in general, it means 
that the total amount of data to be stored has doubled(for each key, we need to 
store two records, one for timer, one for state). This maybe a bit 
uncomfortable when the state backend is based on RocksDB, because the timers 
are living together with the other states in the same RocksDB instance, which 
means that with using TTL, the amount of the records in RocksDB has to be 
doubled, I'm afraid this may hurt its performance.


Concerning the approach to add a timestamp to each value, TBH, I didn't have a 
deep thought on it yet and also not sure about it...In general, it can be 
described as follows:


- We attach a TS for every state record.
- When getting the record, we check the TS to see if its outdated.
- For the records that we will never touch again, we use the compaction to 
remove them. maybe one day one compaction is enough.


Best, Sihua


On 05/16/2018 16:38,Fabian Hueske<fhue...@gmail.com> wrote:
Hi,


Yes. IMO it makes sense to put the logic into the abstract base classes to 
share the implementation across different state backends and state primitives.

The overhead of storing the key twice is a valid concern, but I'm not sure 
about the approach to add a timestamp to each value.
How would we discard state then? Iterating always over all (or a range of) keys 
to check if their state should be expired?
That would only work efficiently if we relax the clean-up logic which could be 
a valid design decision.



Best, Fabian



2018-05-14 9:33 GMT+02:00 sihua zhou <summerle...@163.com>:

Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement the 
TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest way is to 
append the `ts` to the state's value? and we use the backend's `current 
time`(its also can be event time and process time) to judge whether the data is 
outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we 
implement TTL base on timer, we need to store two copys of key, one for the 
timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fhue...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll need 
some kind of timers to expire state, so I think we should reuse components that 
we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs helps to 
have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <summerle...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is 
that it makes me feel like a coupling that the TTL need to depend on `timer`. 
Because I think the TTL is a property of the state, so it should be backed by 
the state backend. If we implement the TTL base on the timer, than it looks 
like a coupling... it makes me feel that the backend for state becomes `state 
backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state 
backend` in theroy, it's a type of HeapState that scoped to the `key group`(not 
scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL 
that could provides a better performance. Because to me, the reason that I need 
TTL is just to prevent the state size growing infinitly(I believe I'm not the 
only one like this), so a relax version is enough, if there is a relax TTL 
which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bowenl...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N name

[jira] [Created] (FLINK-9426) Harden RocksDBWriteBatchPerformanceTest.benchMark()

2018-05-23 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9426:
-

 Summary: Harden RocksDBWriteBatchPerformanceTest.benchMark()
 Key: FLINK-9426
 URL: https://issues.apache.org/jira/browse/FLINK-9426
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


We use the assert to check the performance of WriteBatch is better than Put(), 
this should be true in general, but in sometimes this could also be false. We 
may need to follow the other tests under the 
*org.apache.flink.contrib.streaming.state.benchmark.**, only use the timeout 
property to valid the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-05-23 Thread sihua zhou


Thanks for your reply @Fabian and @Stefan


@Fabian: The bloom filter state I proposal would be "elastic" and "lazy 
allocation", what we have on each key group is a list of bloom filter 
node(which is shrinkable), every bloom filter node has its capacity, we 
allocate a new one only when the last one is filled, and also there is a 
relaxed TTL to recycle the memory resources to enable it to run properly, for 
example, we consider the situation on key group 0:

key group0: bfNode0 -> bfNode1 -> bfNode2 

if the bfNode2 is filled, we allocate a new BF to store the data fails into key 
group 0, the situation on key group 0 becomes:

key group0: bfNode0 -> bfNode1 -> bfNode2 -> bfNode3

and once a bfNode filled, it will never be changed again, so for example, if 
bfNode0 is filled at time point: t1, and the TTL = 2 hour, then we could 
release bfNode0 at the time point "t1 + 2hour". After that the situation on key 
group 0 becomes:

key group0: bfNode1 -> bfNode2 -> bfNode3



What do you think of this?


@Stefan: Yes, I definitely agree with your point. And we should discuss the 
implementation deeply before jump into implementation.


Best,
Sihua






On 05/23/2018 17:33,Stefan Richter<s.rich...@data-artisans.com> wrote:
Hi,

In general, I like the proposal as well. We should try to integrate all forms 
of keyed state with the backend, to avoid the problems that we are currently 
facing with the timer service. We should discuss which exact implementation of 
bloom filters are the best fit.

@Fabian: There are also implementations of bloom filters that use counting and 
therefore support deletes, but obviously this comes at the cost of a 
potentially higher space consumption.

Best,
Stefan

Am 23.05.2018 um 11:29 schrieb Fabian Hueske <fhue...@gmail.com>:

Thanks for the proposal Sihua!

Let me try to summarize the motivation / scope of this proposal.

You are proposing to add support for a special Bloom Filter state per KeyGroup 
and reduce the number of key accesses by checking the Bloom Filter first.

This is would be a rather generic feature that could be interesting for various 
applications, including joins and deduplication as you described.

IMO, such a feature would be very interesting. However, my concerns with Bloom 
Filter is that they are insert-only data structures, i.e., it is not possible 
to remove keys once they were added. This might render the filter useless over 
time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned that 
the Bloom Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory 
boundaries over time?

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8918 
<https://issues.apache.org/jira/browse/FLINK-8918>

2018-05-23 9:56 GMT+02:00 sihua zhou <summerle...@163.com 
<mailto:summerle...@163.com>>:
Hi Devs!
I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make up 
this proposal is that, it helped us a lot on production, it let's improve the 
performance with reducing consumption of resources. Here is a brief description 
fo the motivation of why it's so powful, more detail information can be found 
https://issues.apache.org/jira/browse/FLINK-8601 
<https://issues.apache.org/jira/browse/FLINK-8601> , and the design doc can be 
found 
https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
 
<https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing>


Motivation

There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diffs us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.

Scenario 1: Runtime Filter Join

In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state fr

[PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-05-23 Thread sihua zhou
Hi Devs!
I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make up 
this proposal is that, it helped us a lot on production, it let's improve the 
performance with reducing consumption of resources. Here is a brief description 
fo the motivation of why it's so powful, more detail information can be found 
https://issues.apache.org/jira/browse/FLINK-8601 , and the design doc can be 
found 
https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing



Motivation


There are some scenarios drive us to introduce this ElasticBloomFilter, one is 
Stream Join, another is Data Deduplication, and some special user cases...This 
has given us a great experience, for example,  we implemented the Runtime 
Filter Join base on it, and it gives us a great performance improvement. With 
this feature, It diffs us from the "normal stream join", allows us to improve 
performance while reducing resource consumption by about half!!!
I will list the two most typical user cases that optimized by the 
ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
Dedeplication" in brief.


Scenario 1: Runtime Filter Join


In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the “joined ratio” of the stream join is often very low, for 
example.
stream join in promotion analysis: Job need to join the promotion log with the 
action(click, view, buy) log with the promotion_id to analysis the effect of 
the promotion.
stream join in AD(advertising) attribution: Job need to join the AD click log 
with the item payment log on the click_id to find which click of which AD that 
brings the payment to do attribution.
stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
….so on
All these cases have one common property, that is the joined ratio is very low. 
Here is a example to describe it, we have 1 records from the left stream, 
and 1 records from the right stream, and we execute  select * from 
leftStream l join rightStream r on l.id = r.id , we only got 100 record from 
the result, that is the case for low joined ratio, this is an example for inner 
join, but it can also applied to left & right join.


there are more example I can come up with low joined ratio…but the point I want 
to raise up is that the low joined ratio of stream join in production is a very 
common phenomenon(maybe even the almost common phenomenon in some companies, at 
least in our company that is the case).


How to improve this?


We can see from the above case, 1 record join 1 record and we only got 
100 result, that means, we query the state 2 times (1 for the left 
stream and 1 for the right stream) but only 100 of them are meaningful!!! 
If we could reduce the useless query times, then we can definitely improve the 
performance of stream join.
the way we used to improve this is to introduce the Runtime Filter Join, the 
mainly ideal is that, we build a filter for the state on each side (left stream 
& right stream). When we need to query the state on that side we first check 
the corresponding filter whether the key is possible in the state, if the 
filter say "not, it impossible in the State", then we stop querying the state, 
if it say "hmm, it maybe in state", then we need to query the state. As you can 
see, the best choose of the filter is Bloom Filter, it has all the feature that 
we want: extremely good performance, non-existence of false negative.


Scenario 2:  Data Deduplication


We have implemented two general functions based on the ElasticBloomFilter. They 
are count(distinct x) and select distinct x, y, z from table. Unlike the 
Runtime Filter Join the result of this two functions is approximate, not 
exactly. There are used in the scenario where we don't need a 100% accurate 
result, for example, to count the number of visiting users in each online 
store. In general, we don't need a 100% accurate result in this case(indeed we 
can't give a 100% accurate result, because there could be error when collecting 
user_id from different devices), if we could get a 98% accurate result with 
only 1/2 resource, that could be very nice.


I believe there would be more user cases in stream world that could be 
optimized by the Bloom Filter(as what it had done in the big data world)...


I will appreciate it very much, if someone could have a look of the JIRA or the 
google doc and give some comments!


Thanks, Sihua



Re:[VOTE] Release 1.5.0, release candidate #5

2018-05-22 Thread sihua zhou


Hi,
just one minor thing, I found the JIRA release notes seem a bit inconsistent 
with the this RC. For example, https://issues.apache.org/jira/browse/FLINK-9058 
hasn't been merged yet but included in the release notes, and 
https://issues.apache.org/jira/browse/FLINK-9070 has been merged but not 
included in the relase notes.


Best, Sihua




On 05/23/2018 09:18,Till Rohrmann wrote:
Hi everyone,

Please review and vote on the release candidate #5 for the version 1.5.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 1F302569A96CFFD5 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.5.0-rc5" [5],

Please use this document for coordinating testing efforts: [6]

Since the newly included fixes affect only individual components and are
covered by tests, I will shorten the voting period until tomorrow 5:30pm
CET. It is adopted by majority approval, with at least 3 PMC affirmative
votes.

Thanks,
Your friendly Release Manager

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12341764
[2] http://people.apache.org/~trohrmann/flink-1.5.0-rc5/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1160/
[5]
https://git-wip-us.apache.org/repos/asf?p=flink.git;a=commit;h=841bfe4cceecc9cd6ad3d568173fdc0149a5dc9b
[6]
https://docs.google.com/document/d/1C1WwUphQj597jExWAXFUVkLH9Bi7-ir6drW9BgB8Ezo/edit?usp=sharing

Pro-tip: you can create a settings.xml file with these contents:



flink-1.5.0



flink-1.5.0


flink-1.5.0


https://repository.apache.org/content/repositories/orgapacheflink-1160/



archetype


https://repository.apache.org/content/repositories/orgapacheflink-1160/







And reference that in you maven commands via --settings
path/to/settings.xml. This is useful for creating a quickstart based on the
staged release and for building against the staged jars.


[jira] [Created] (FLINK-9401) Data lost when rescaling the job from incremental checkpoint

2018-05-18 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9401:
-

 Summary: Data lost when rescaling the job from incremental 
checkpoint
 Key: FLINK-9401
 URL: https://issues.apache.org/jira/browse/FLINK-9401
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2, 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We may lost data when rescaling job from incremental checkpoint because of the 
following code.
{code:java}
try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, 
columnFamilyHandle)) {

   int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
  startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
   }

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

  int keyGroup = 0;
  for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
 keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
  }

  if (stateBackend.keyGroupRange.contains(keyGroup)) {
 stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
  }

  iterator.next();
   }
}
{code}

For every state handle to fetch the target data, we 
_seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be 
INVALID immediately if the state handle's _start key group_ is bigger that 
_state.keyGroupRange.getStartKeyGroup()_. Then, data lost...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] Release 1.5.0, release candidate #3

2018-05-17 Thread sihua zhou
I will update the PR immediately once I finish the dinner.

Best, Sihua




| |
sihua zhou
邮箱:summerle...@163.com
|

签名由 网易邮箱大师 定制




在2018年05月17日 18:29,Till Rohrmann 写道:
Hi Sihua,

thanks for making me aware. This sounds indeed like a problem which might
cause the data loss. I think it's worth fixing since it does not look like
a big thing. I'll postpone the RC until we've fixed this issue. Hopefully
we get it in in the next couple of hours.

Cheers,
Till

On Thu, May 17, 2018 at 4:37 AM, sihua zhou <summerle...@163.com> wrote:

>
>
> Hi,
> And we found this one[1]. It is an issue that could lead to data
> losing(checkpoint & restoring) when people using the RocksDBBackend, cause
> by the not so nice APIs of RocksIterator...had a hard time to believe it,
> but this seems to affect all the release versions, so I'm not sure whether
> it's a blocker to 1.5.
>
>
> [1]https://issues.apache.org/jira/browse/FLINK-9373
>
>
> Best, Sihua
> On 05/17/2018 03:27,Till Rohrmann<trohrm...@apache.org> wrote:
> Testing the RC has surfaced a problem with the release of blobs of finished
> jobs [1]. This is a release blocker and, thus, I have to cancel the RC 3.
> I'll prepare a new RC once the problem has been fixed.
>
> Thanks to all for testing the release candidate!
>
> [1] https://issues.apache.org/jira/browse/FLINK-9381?filter=12327438
>
> Cheers,
> Till
>
> On Tue, May 15, 2018 at 5:32 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> Hi everyone,
>
> Please review and vote on the release candidate #3 for the version 1.5.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint 1F302569A96CFFD5 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.5.0-rc3" [5],
>
> Please use this document for coordinating testing efforts: [6]
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Your friendly Release Manager
>
> [1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> projectId=12315522=12341764
> [2] http://people.apache.org/~trohrmann/flink-1.5.0-rc3/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1156
> [5] https://git-wip-us.apache.org/repos/asf?p=flink.git;a=commit;h=
> e7252690af7c4c1167dac8f2dd08ff33d6d829b5
> [6] https://docs.google.com/document/d/11DmsJDe-
> 4ljHgXByHqFQsDRH_gjuKGQHpDZ9N-Frtyg/edit?usp=sharing
>
> Pro-tip: you can create a settings.xml file with these contents:
>
> 
> 
> flink-1.5.0
> 
> 
> 
> flink-1.5.0
> 
> 
> flink-1.5.0
> 
> https://repository.apache.org/content/repositories/
> orgapacheflink-1156/
> 
> 
> 
> archetype
> 
> https://repository.apache.org/content/repositories/
> orgapacheflink-1156/
> 
> 
> 
> 
> 
> 
>
> And reference that in you maven commands via --settings
> path/to/settings.xml. This is useful for creating a quickstart based on the
> staged release and for building against the staged jars.
>
>
>


Re: [VOTE] Release 1.5.0, release candidate #3

2018-05-17 Thread sihua zhou


Hi,
And we found this one[1]. It is an issue that could lead to data 
losing(checkpoint & restoring) when people using the RocksDBBackend, cause by 
the not so nice APIs of RocksIterator...had a hard time to believe it, but this 
seems to affect all the release versions, so I'm not sure whether it's a 
blocker to 1.5.


[1]https://issues.apache.org/jira/browse/FLINK-9373


Best, Sihua
On 05/17/2018 03:27,Till Rohrmann wrote:
Testing the RC has surfaced a problem with the release of blobs of finished
jobs [1]. This is a release blocker and, thus, I have to cancel the RC 3.
I'll prepare a new RC once the problem has been fixed.

Thanks to all for testing the release candidate!

[1] https://issues.apache.org/jira/browse/FLINK-9381?filter=12327438

Cheers,
Till

On Tue, May 15, 2018 at 5:32 PM, Till Rohrmann  wrote:

Hi everyone,

Please review and vote on the release candidate #3 for the version 1.5.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 1F302569A96CFFD5 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.5.0-rc3" [5],

Please use this document for coordinating testing efforts: [6]

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Your friendly Release Manager

[1] https://issues.apache.org/jira/secure/ReleaseNote.jspa?
projectId=12315522=12341764
[2] http://people.apache.org/~trohrmann/flink-1.5.0-rc3/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1156
[5] https://git-wip-us.apache.org/repos/asf?p=flink.git;a=commit;h=
e7252690af7c4c1167dac8f2dd08ff33d6d829b5
[6] https://docs.google.com/document/d/11DmsJDe-
4ljHgXByHqFQsDRH_gjuKGQHpDZ9N-Frtyg/edit?usp=sharing

Pro-tip: you can create a settings.xml file with these contents:



flink-1.5.0



flink-1.5.0


flink-1.5.0

https://repository.apache.org/content/repositories/
orgapacheflink-1156/



archetype

https://repository.apache.org/content/repositories/
orgapacheflink-1156/







And reference that in you maven commands via --settings
path/to/settings.xml. This is useful for creating a quickstart based on the
staged release and for building against the staged jars.




[jira] [Created] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9373:
-

 Summary: Always call RocksIterator.status() to check the internal 
error of RocksDB
 Key: FLINK-9373
 URL: https://issues.apache.org/jira/browse/FLINK-9373
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
check whether we have reached the end of the iterator. But that is not enough, 
if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that _iterator.isValid()_ may also cause by a internal error. A safer way to 
use the _RocksIterator_ is to always call the _iterator.status()_ to check the 
internal error of _RocksDB_. There is a case from user email seems to lost data 
because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9364) Add doc for the memory usage in flink

2018-05-15 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9364:
-

 Summary: Add doc for the memory usage in flink
 Key: FLINK-9364
 URL: https://issues.apache.org/jira/browse/FLINK-9364
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


We need to add a doc to describe the memory usage in flink, especially when 
people use the RocksDBBackend, many people get confuse because of that (I've 
saw serval question related to this on the user emails).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-05-14 Thread sihua zhou
Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement the 
TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest way is to 
append the `ts` to the state's value? and we use the backend's `current 
time`(its also can be event time and process time) to judge whether the data is 
outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we 
implement TTL base on timer, we need to store two copys of key, one for the 
timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fhue...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll need 
some kind of timers to expire state, so I think we should reuse components that 
we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs helps to 
have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <summerle...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is 
that it makes me feel like a coupling that the TTL need to depend on `timer`. 
Because I think the TTL is a property of the state, so it should be backed by 
the state backend. If we implement the TTL base on the timer, than it looks 
like a coupling... it makes me feel that the backend for state becomes `state 
backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state 
backend` in theroy, it's a type of HeapState that scoped to the `key group`(not 
scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL 
that could provides a better performance. Because to me, the reason that I need 
TTL is just to prevent the state size growing infinitly(I believe I'm not the 
only one like this), so a relax version is enough, if there is a relax TTL 
which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bowenl...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fhue...@gmail.com> wrote:


Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bowenl...@gmail.com>:


Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get no

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

2018-05-14 Thread sihua zhou
Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is 
that it makes me feel like a coupling that the TTL need to depend on `timer`. 
Because I think the TTL is a property of the state, so it should be backed by 
the state backend. If we implement the TTL base on the timer, than it looks 
like a coupling... it makes me feel that the backend for state becomes `state 
backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state 
backend` in theroy, it's a type of HeapState that scoped to the `key group`(not 
scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL 
that could provides a better performance. Because to me, the reason that I need 
TTL is just to prevent the state size growing infinitly(I believe I'm not the 
only one like this), so a relax version is enough, if there is a relax TTL 
which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li wrote:
Thank you, Fabian! I've created the FLIP-25 page

.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske  wrote:

Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li :

Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske  wrote:

Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen :

Hello community,

We've come up with a completely new design for 

[jira] [Created] (FLINK-9351) RM stop assigning slot to Job because the TM killed before connecting to JM successfully

2018-05-13 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9351:
-

 Summary: RM stop assigning slot to Job because the TM killed 
before connecting to JM successfully
 Key: FLINK-9351
 URL: https://issues.apache.org/jira/browse/FLINK-9351
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Sihua Zhou


The steps are the following(copied from Stephan's comments in [5931 
title|https://github.com/apache/flink/pull/5931]):

JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager
ResourceManager starts a container with a TaskManager
TaskManager registers at ResourceManager, which tells the TaskManager to push a 
slot to the JobManager.
TaskManager container is killed
The ResourceManager does not queue back the slot requests (AllocationIDs) that 
it sent to the previous TaskManager, so the requests are lost and need to time 
out before another attempt is tried.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9325) generate the _meta file only when the writing is totally successful

2018-05-09 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9325:
-

 Summary: generate the _meta file only when the writing is totally 
successful
 Key: FLINK-9325
 URL: https://issues.apache.org/jira/browse/FLINK-9325
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


We should generate the _meta file for checkpoint only when the writing is 
totally successful. We should write the metadata file first to a temp file and 
then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async

2018-04-27 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9269:
-

 Summary: Concurrency problem in HeapKeyedStateBackend when 
performing checkpoint async
 Key: FLINK-9269
 URL: https://issues.apache.org/jira/browse/FLINK-9269
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
 Fix For: 1.5.0


{code:java}
@Nonnull
@Override
protected 
SnapshotResult performOperation() throws Exception {

long startTime = 
System.currentTimeMillis();


CheckpointStreamFactory.CheckpointStateOutputStream localStream =

this.streamAndResultExtractor.getCheckpointOutputStream();

DataOutputViewStreamWrapper 
outView = new DataOutputViewStreamWrapper(localStream);

serializationProxy.write(outView);

long[] keyGroupRangeOffsets = 
new long[keyGroupRange.getNumberOfKeyGroups()];

for (int keyGroupPos = 0; 
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);

keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();

outView.writeInt(keyGroupId);

for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
try 
(OutputStream kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream)) {

DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);

kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));

cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
} // this will 
just close the outer compression stream
}
}

if 
(cancelStreamRegistry.unregisterCloseable(streamAndResultExtractor)) {
KeyGroupRangeOffsets 
kgOffs = new KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);

SnapshotResult result =

streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();

streamAndResultExtractor = null;

logOperationCompleted(primaryStreamFactory, startTime);
return 
CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, 
kgOffs);
}

return SnapshotResult.empty();
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9263) Kafka010ITCase failed on travis flaky

2018-04-27 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9263:
-

 Summary: Kafka010ITCase failed on travis flaky
 Key: FLINK-9263
 URL: https://issues.apache.org/jira/browse/FLINK-9263
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Sihua Zhou
 Fix For: 1.5.0


instance: https://travis-ci.org/apache/flink/jobs/371952726

I found this
{code}
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 1: 
pool-26-thread-2 , Thread 2: pool-26-thread-1
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:858)
... 5 more
Caused by: java.lang.IllegalStateException: Concurrent access to 
KryoSerializer. Thread 1: pool-26-thread-2 , Thread 2: pool-26-thread-1
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:622)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:254)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.write(DefaultOperatorStateBackend.java:687)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:423)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9260) Introduce a friendly way to resume the job from externalized checkpoints automatically

2018-04-26 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9260:
-

 Summary: Introduce a friendly way to resume the job from 
externalized checkpoints automatically
 Key: FLINK-9260
 URL: https://issues.apache.org/jira/browse/FLINK-9260
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


Currently, it's quite a bit not friendly for users to recover job from the 
externalized checkpoint, user need to find the dedicate dir for the job which 
is not a easy thing when there are too many jobs. This ticket attend to 
introduce a more friendly way to allow the user to use the externalized 
checkpoint to do recovery.

The implementation steps are copied from the comments of [~StephanEwen] in 
[9043|https://issues.apache.org/jira/browse/FLINK-9043]

- We could make this an option where you pass a flag (-r) to automatically look 
for the latest checkpoint in a given directory.
- If more than one jobs checkpointed there before, this operation would fail.
- We might also need a way to have jobs not create the UUID subdirectory, 
otherwise the scanning for the latest checkpoint would not easily work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9251) Move MemoryStateBackend to flink-state-backends

2018-04-24 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9251:
-

 Summary: Move MemoryStateBackend to flink-state-backends
 Key: FLINK-9251
 URL: https://issues.apache.org/jira/browse/FLINK-9251
 Project: Flink
  Issue Type: Improvement
Reporter: Sihua Zhou


Since RocksDBBackend has been moved to flink-state-backends, we should also 
move MemoryStateBackends to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9243) SuccessAfterNetworkBuffersFailureITCase#testSuccessfulProgramAfterFailure is unstable

2018-04-23 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9243:
-

 Summary: 
SuccessAfterNetworkBuffersFailureITCase#testSuccessfulProgramAfterFailure is 
unstable
 Key: FLINK-9243
 URL: https://issues.apache.org/jira/browse/FLINK-9243
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Sihua Zhou
 Fix For: 1.5.0


There are some instances:
https://travis-ci.org/apache/flink/jobs/370121126
https://travis-ci.org/apache/flink/jobs/370022111
https://travis-ci.org/apache/flink/jobs/370055803



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9174) The type of state created in ProccessWindowFunction.proccess() is inconsistency

2018-04-14 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9174:
-

 Summary: The type of state created in 
ProccessWindowFunction.proccess() is inconsistency
 Key: FLINK-9174
 URL: https://issues.apache.org/jira/browse/FLINK-9174
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


The type of state created from windowState and globalState in 
{{ProcessWindowFunction.process()}} is inconsistency. For detail,
{code}
context.windowState().getListState(); // return type is HeapListState or 
RocksDBListState
context.globalState().getListState(); // return type is UserFacingListState
{code}

This cause the problem in the following code,
{code}
Iterable iterableState = listState.get();
 if (terableState.iterator().hasNext()) {
   for (T value : iterableState) {
 value.setRetracting(true);
 collector.collect(value);
   }
   state.clear();
}
{code}
If the {{listState}} is created from {{context.globalState()}} is fine, but 
when it created from {{context.windowState()}} this will cause NPE. I met this 
in 1.3.2 but I found it also affect 1.5.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9116) Introduce getAll and removeAll for MapState

2018-03-30 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9116:
-

 Summary: Introduce getAll and removeAll for MapState
 Key: FLINK-9116
 URL: https://issues.apache.org/jira/browse/FLINK-9116
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
 Fix For: 1.6.0


We have supported {{putAll(List)}} in {{MapState}}, I think we should also 
support {{getAll(Iterable)}} and {{removeAll(Iterable)}} in {{MapState}}, it 
can be convenient in some scenario.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9102) Make the JobGraph disable queued scheduling for Flip6LocalStreamEnvironment

2018-03-28 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9102:
-

 Summary: Make the JobGraph disable queued scheduling for 
Flip6LocalStreamEnvironment
 Key: FLINK-9102
 URL: https://issues.apache.org/jira/browse/FLINK-9102
 Project: Flink
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


When we start cluster locally with fixed TMS and build stream job with 
{{Flip6LocalStreamEnvironment}}, we should disable queued scheduling for 
JobGraph.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9028) flip6 should check config before starting cluster

2018-03-20 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9028:
-

 Summary: flip6 should check config before starting cluster
 Key: FLINK-9028
 URL: https://issues.apache.org/jira/browse/FLINK-9028
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


In flip6, we should perform parameters checking before starting cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9022:
-

 Summary: fix resource close in 
`StreamTaskStateInitializerImpl.streamOperatorStateContext()`
 Key: FLINK-9022
 URL: https://issues.apache.org/jira/browse/FLINK-9022
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


We have the following code in 
{{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
incorrect:
{code}

{code}  } catch (Exception ex) {

// cleanup if something went wrong before results got 
published.
if 
(streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}

if 
(streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
IOUtils.closeQuietly(keyedStatedBackend);
}

if 
(streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
IOUtils.closeQuietly(rawKeyedStateInputs);
}

if 
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}

if 
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
IOUtils.closeQuietly(rawOperatorStateInputs);
}

throw new Exception("Exception while creating 
StreamOperatorStateContext.", ex);
}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8968) Fix native resource leak caused by ReadOptions

2018-03-15 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8968:
-

 Summary: Fix native resource leak caused by ReadOptions 
 Key: FLINK-8968
 URL: https://issues.apache.org/jira/browse/FLINK-8968
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


We should pull the creation of ReadOptions out of the loop in 
{{RocksDBFullSnapshotOperation.writeKVStateMetaData()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8927) Eagerly release the checkpoint object created from RocksDB

2018-03-12 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8927:
-

 Summary: Eagerly release the checkpoint object created from RocksDB
 Key: FLINK-8927
 URL: https://issues.apache.org/jira/browse/FLINK-8927
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


We should eagerly release the checkpoint object that is created from RocksDB, 
because it's a {{RocksObject}} (a native resource).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8918) Introduce Runtime Filter Join

2018-03-11 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8918:
-

 Summary: Introduce Runtime Filter Join
 Key: FLINK-8918
 URL: https://issues.apache.org/jira/browse/FLINK-8918
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


Right now, for every record that need to be joined, we need to query both `left 
stream's state` and `right stream's state`. I proposal to introduce RF join to 
reduce the `query count` of state, which could improve the performance of 
`stream join`, especially when the joined rate is low. A simple description for 
RF join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8859) RocksDB backend should pass WriteOption to Rocks.put() when restoring

2018-03-05 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8859:
-

 Summary: RocksDB backend should pass WriteOption to Rocks.put() 
when restoring
 Key: FLINK-8859
 URL: https://issues.apache.org/jira/browse/FLINK-8859
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


We should pass `WriteOption` to Rocks.put() when restoring from handle (Both in 
full & incremental checkpoint). Because of `WriteOption.setDisableWAL(true)`, 
the performance can be increased by about 2 times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8846) ntroducing `parallel recovery` mode for incremental checkpoint

2018-03-03 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8846:
-

 Summary: ntroducing `parallel recovery` mode for incremental 
checkpoint
 Key: FLINK-8846
 URL: https://issues.apache.org/jira/browse/FLINK-8846
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


escription
Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, we 
can restore from incremental checkpoint in parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8845) Introducing `parallel recovery` mode for fully checkpoint (savepoint)

2018-03-03 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8845:
-

 Summary:  Introducing `parallel recovery` mode for fully 
checkpoint (savepoint)
 Key: FLINK-8845
 URL: https://issues.apache.org/jira/browse/FLINK-8845
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0


Base on {{ingestExternalFile()}} and {{SstFileWriter}} provided by RocksDB, we 
can restore from fully checkpoint (savepoint) in parallel. This can also be 
extended to incremental checkpoint easily, but for the sake of simple, we do 
this in two separate tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8817) Decrement numPendingContainerRequests only when request container successfully

2018-02-28 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8817:
-

 Summary: Decrement numPendingContainerRequests only when request 
container successfully
 Key: FLINK-8817
 URL: https://issues.apache.org/jira/browse/FLINK-8817
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


We should decrement {{numPendingContainerRequests}} only when request container 
successfully. Otherwise, the re-requested container can be released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8790) Improve performance for recovery from incremental checkpoint

2018-02-26 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8790:
-

 Summary: Improve performance for recovery from incremental 
checkpoint
 Key: FLINK-8790
 URL: https://issues.apache.org/jira/browse/FLINK-8790
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


When there are multi state handle to be restored, we can improve the 
performance as follow:

1. Choose the best state handle to init the target db
2. Use the other state handles to create temp db, and clip the db according to 
the target key group range (via rocksdb.deleteRange()), this can help use get 
rid of the `key group check` in 
 `data insertion loop` and also help us get rid of traversing the useless 
record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8777) improve resource release when recovery from failover

2018-02-26 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8777:
-

 Summary: improve resource release when recovery from failover
 Key: FLINK-8777
 URL: https://issues.apache.org/jira/browse/FLINK-8777
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


When recovery from failed, {{TaskLocalStateStoreImpl.retrieveLocalState()}} 
will be invoked, we can release all entry from 
{{storedTaskStateByCheckpointID}}  that does not satisfy {{entry.checkpointID 
== checkpointID}}, this can prevent the resource leak when job loop in {{local 
checkpoint completed => failed => local checkpoint completed => failed ...}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8753) Introduce Incremental savepoint

2018-02-22 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8753:
-

 Summary: Introduce Incremental savepoint
 Key: FLINK-8753
 URL: https://issues.apache.org/jira/browse/FLINK-8753
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Right now, savepoint goes through the full checkpoint path, take a savepoint 
could be slowly. In our production, for some long term job it often costs more 
than 10min to complete a savepoint which is unacceptable for a real time job, 
so we have to turn back to use the externalized checkpoint instead currently. 
But the externalized  checkpoint has a time interval (checkpoint interval) 
between the last time. So I proposal to introduce the increment savepoint which 
goes through the increment checkpoint path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8699) Fix concurrency problem in rocksdb full checkpoint

2018-02-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8699:
-

 Summary: Fix concurrency problem in rocksdb full checkpoint
 Key: FLINK-8699
 URL: https://issues.apache.org/jira/browse/FLINK-8699
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


In full checkpoint, `kvStateInformation` is not a copied object and it can be 
changed when writeKVStateMetaData() is invoking ... This can lead to 
problematic, which is serious.
{code}
private void writeKVStateMetaData() throws IOException {
  // ...
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo>> column :
stateBackend.kvStateInformation.entrySet()) {
}
  //...
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8679:
-

 Summary: RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't 
filter data with namespace
 Key: FLINK-8679
 URL: https://issues.apache.org/jira/browse/FLINK-8679
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
doesn't use the namespace to filter data. And 
`HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8676) Memory Leak in AbstractKeyedStateBackend.applyToAllKeys() when backend is base on RocksDB

2018-02-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8676:
-

 Summary: Memory Leak in AbstractKeyedStateBackend.applyToAllKeys() 
when backend is base on RocksDB
 Key: FLINK-8676
 URL: https://issues.apache.org/jira/browse/FLINK-8676
 Project: Flink
  Issue Type: Bug
Reporter: Sihua Zhou
Assignee: Sihua Zhou


`AbstractKeyedStateBackend.applyToAllKeys() ` uses backend's getKeys(stateName, 
namespace) to get all keys that belong to `namespace`. But, in 
`RocksDBKeyedStateBackend.getKeys()` if just return a object which wrap a 
`rocksdb iterator`, that is dangous, because rocksdb will ping the resources 
that belong to the iterator into memory untill iterator.close() is invoked, but 
it didn't invoked right now. This will lead to memory leak finally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8657) Fix incorrect description for external checkpoint vs savepoint

2018-02-14 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8657:
-

 Summary: Fix incorrect description for external checkpoint vs 
savepoint
 Key: FLINK-8657
 URL: https://issues.apache.org/jira/browse/FLINK-8657
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


I checked that external checkpoint also supported rescale both in code and 
practice. But in the doc it still note that "do not support Flink specific 
features like rescaling." 

I am afraid whether I have missed something, if so please just close this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8639) Fix always need to seek multiple times when iterator RocksDBMapState

2018-02-12 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8639:
-

 Summary: Fix always need to seek multiple times when iterator 
RocksDBMapState
 Key: FLINK-8639
 URL: https://issues.apache.org/jira/browse/FLINK-8639
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.5.0


Currently, almost every time we want to iterator a RocksDBMapState we need to 
do seek at least 2 times (Seek is a poor performance action for rocksdb cause 
it can't use the bloomfilter). This is because `RocksDBMapIterator` use a 
`cacheEntries` to cache the seek values every time and the `cacheEntries`'s 
init size is 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8602) Accelerate recover from failover when use incremental checkpoint

2018-02-08 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8602:
-

 Summary: Accelerate recover from failover when use incremental 
checkpoint
 Key: FLINK-8602
 URL: https://issues.apache.org/jira/browse/FLINK-8602
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, when enable incremental checkpoint, if user change the parallelism 
then `hasExtraKeys` may be `true`. If this occur, flink will loop all rocksdb 
instance and iterator all data to fetch the data that fails into current 
`KeyGroupRange`, this can be improved because

 if a state handle's `KeyGroupRange` is fully covered by  current 
`KeyGroupRange`, we can open the rocksdb it corresponded directly.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8601) Introduce LinkedBloomFilterState for Approximate calculation and other situations of performance optimization

2018-02-07 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8601:
-

 Summary: Introduce LinkedBloomFilterState for Approximate 
calculation and other situations of performance optimization
 Key: FLINK-8601
 URL: https://issues.apache.org/jira/browse/FLINK-8601
 Project: Flink
  Issue Type: New Feature
  Components: Core, DataStream API
Affects Versions: 1.4.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


h3. Backgroud

Bloom filter is useful in many situation, for example:
 * 1. Approximate calculation: deduplication (eg: UV calculation)
 * 2. Performance optimization: eg, [runtime filter 
join|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]

However, based on the current status provided by flink, it is hard to use the 
bloom filter for the following reasons:
 * 1. Serialization problem: Bloom filter status can be large (for example: 
100M), if implement it based on the RocksDB state, the state data will need to 
be serialized each time it is queried and updated, and the performance will be 
very poor.
 * 2. Data skewed: Data in different key group can be skewed, and the 
information of data skewed can not be accurately predicted before the program 
is running. Therefore, it is impossible to determine how much resources bloom 
filter should allocate. One way to do this is to allocate space needed for the 
most skewed case, but this can lead to very serious waste of resources.

h3. Requirement

Therefore, I introduce the LinkedBloomFilterState for flink, which at least 
need to meet the following features:
 * 1. Support for changing Parallelism
 * 2. Only serialize when necessary: when performing checkpoint
 * 3. Can deal with data skew problem: users only need to specify a 
LinkedBloomFilterState with the desired input, fpp, system will allocate 
resource dynamic.
 * 4. Do not conflict with other state: user can use KeyedState and 
OperateState when using bloom filter state.
 * 5. Support relax ttl (ie: the data survival time at least greater than the 
specified time)

Design doc: to be soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8018) RMQ does not support disabling queueDeclare, when the user has no declaration permissions, it cannot connect

2017-11-07 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-8018:
-

 Summary: RMQ does not support disabling queueDeclare, when the 
user has no declaration permissions, it cannot connect
 Key: FLINK-8018
 URL: https://issues.apache.org/jira/browse/FLINK-8018
 Project: Flink
  Issue Type: Bug
  Components: RabbitMQ Connector
Affects Versions: 1.3.2
Reporter: Sihua Zhou
Assignee: Sihua Zhou


RabbitMQ connector should support disabling the call of queueDeclare or not, in 
case that user does not have sufficient authority to declare the queue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7873) Introduce HybridStreamStateHandle for quick recovery from checkpoint.

2017-10-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-7873:
-

 Summary: Introduce HybridStreamStateHandle for quick recovery from 
checkpoint.
 Key: FLINK-7873
 URL: https://issues.apache.org/jira/browse/FLINK-7873
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.3.2
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Current recovery strategy will always read checkpoint data from remote 
FileStream (HDFS). This will cost a lot of network when the state is so big 
(e.g. 1T), this cost can be saved by reading the checkpoint data from local 
disk. So i introduce a HybridStreamStateHandler which try to create a local 
input stream first, if failed, it then create a remote input stream, it 
prototype looks like below:
{code:java}
class HybridStreamHandle {
   private FileStateHandle localHandle;
   private FileStateHandle remoteHandle;
   ..
   public FSDataInputStream openInputStream() throws IOException {
FSDataInputStream inputStream = localHandle.openInputStream();
return inputStream != null ? inputStream : 
remoteHandle.openInputStream();
}
.
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7219) Current allocate strategy cann‘t achieve the optimal effect with input's location

2017-07-18 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-7219:
-

 Summary: Current allocate strategy cann‘t achieve the optimal 
effect with input's location
 Key: FLINK-7219
 URL: https://issues.apache.org/jira/browse/FLINK-7219
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.3.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou


This is second subtask of issue 
[FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153?filter=-2].

Current allocate strategy can't allocate the slot optimize.  Here is the test 
case:
{code}
JobVertex v1 = new JobVertex("v1", jid1);
JobVertex v2 = new JobVertex("v2", jid2);

SlotSharingGroup group = new SlotSharingGroup();

v1.setSlotSharingGroup(group);
v2.setSlotSharingGroup(group);

v1.setParallelism(2);
v2.setParallelism(4);

v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);

v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED_BOUNDED);
{code}
Currently, after allocate for v1,v2, we got a local partition and three remote 
partition. But actually, it should be 2 local partition and 2 remote partition. 

The causes of the above problems is becuase that the current allocate strategy 
is allocate the resource for execution one by one(if the execution can allocate 
from SlotGroup than get it, Otherwise ask for a new one for it). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7218) ExecutionVertex.getPreferredLocationsBasedOnInputs() will always return empty

2017-07-18 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-7218:
-

 Summary: ExecutionVertex.getPreferredLocationsBasedOnInputs() will 
always return empty
 Key: FLINK-7218
 URL: https://issues.apache.org/jira/browse/FLINK-7218
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.3.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou


This 's a subtask of 
[FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153?filter=-2].

The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return 
empty, cause `sourceSlot` always be null until `ExectionVertex` has been 
deployed via 'Execution.deployToSlot()'. So allocate resource base on prefered 
location can't work correctly, we need to set the slot info for `Execution` as 
soon as Execution.allocateSlotForExecution() called successfully.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7180) CoGroupStream perform checkpoint failed

2017-07-14 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-7180:
-

 Summary: CoGroupStream perform checkpoint failed
 Key: FLINK-7180
 URL: https://issues.apache.org/jira/browse/FLINK-7180
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.3.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou


When using the CoGroup api and enable the checkpoint, Job will failed when 
perform checkpoint, e.g:
{code:java}
input1.coGroup(input2)
.where(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.equalTo(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.window(SlothJoinWindow.create())
.trigger(new SlothWindowTrigger(0))
.apply(new CoGroupFunction<String, String, String>() {
@Override
public void coGroup(Iterable first, 
Iterable second, Collector out) throws Exception {
String outputStr = "first:" + first + " , second:" + 
second;
System.out.println(outputStr);
out.collect(outputStr);
}
})
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.print();
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7160) Support hive like udtf

2017-07-11 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-7160:
-

 Summary: Support hive like udtf
 Key: FLINK-7160
 URL: https://issues.apache.org/jira/browse/FLINK-7160
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Affects Versions: 1.3.1
Reporter: Sihua Zhou


Support one row in and multi-column/multi-row out(one-to-many mapping), just 
like udtf in hive.
The query would like this:
{code}
select udtf(arg1, arg2) as (seg1, seg2, seg3) from table
{code}
This is to be friendly for the user that migrate from hive. At present, calcite 
doesn't support this grammar, but it will be support it later, link: 
[CALCITE-1581|https://issues.apache.org/jira/browse/CALCITE-1581]




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6980) TypeExtractor.getForObject can't get typeinfo correctly.

2017-06-22 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-6980:
-

 Summary: TypeExtractor.getForObject can't get typeinfo correctly.
 Key: FLINK-6980
 URL: https://issues.apache.org/jira/browse/FLINK-6980
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.3.0
Reporter: Sihua Zhou
Priority: Minor


Here is my class define:

_class MyRecord extends Row implements Retracting, Value {}_

When i use it like below, it just throw type cast error: 
java.lang.ClassCastException: org.apache.flink.types.Row cannot be cast to 
org.apache.flink.types.Value

MyRecord[] recordList = new MyRecord[6];
DataStream dataStream = env.fromElements(recordList);
//MyFilter 's input arg type is MyRecord.
dataStream.flatMap(new MyFilter()).returns(MyRecord.class).print();

I found this is becuase of the TypeExtractor.getForObject called in 
env.fromElements() can't get the 
element's type corrently and TypeExtractor.getForObject work corrently in flink 
1.2.0. 

I know this problem can be solved by use env.fromElement(MyRecord.class, 
recordList) instead, i just want to know whether this is a bug or not? Why it 
can be work correctly in 1.2.0 and can't in 1.3.0?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)