Re: RocksDB backend with deferred writes?

2019-04-29 Thread Congxian Qiu
Hi, David

When you flush data to db, you can reference the serialize logic[1], and store 
the serialized bytes to RocksDB.

[1] 
https://github.com/apache/flink/blob/c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java#L136

Best, Congxian
On Apr 29, 2019, 12:57 +0800, aitozi , wrote:
> Hi,David
>
> Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek 
> suggested me to extends the AbstractStreamOperator to custom the operator 
> operation on state or extends the statebackend to add a cache layer on it.
>
> Fyi: 
> https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992
>
> And a new stateBackend was introduced in Flink Forward China 2018 by alibaba 
> , but it has not been open source , you can take a look on the slides.
>
> https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
>
> Thanks,
> Aitozi
>
> 发件人: "David J. C. Beach" 
> 日期: 2019年4月29日 星期一 上午11:52
> 收件人: aitozi 
> 抄送: 
> 主题: Re: RocksDB backend with deferred writes?
>
> Thanks Aitozi.
>
> Your answer makes good sense and I'm trying to implement this now.  My code 
> is written as a KeyedProcessFunction, but I can't see where this exposes the 
> KeyContext interface.  Is there anything you can point me to in the docs?
>
> Best,
> David
>
>
>
> On Sun, Apr 28, 2019 at 8:09 PM aitozi  wrote:
> > Hi,David
> >
> > RocksdbKeyedBackend is used under keyContext, every operation with state 
> > should setCurrentKey to let the rocksdb aware of the current key and 
> > complute the currrent keyGroup. Use these two parts to interactive with the 
> > underyling rocksdb.
> >
> > I think you can achieve this goal by setCurrentKey before flush to rocksdb 
> > or make the prefix key (keygroup + key) yourself put/get value to/from 
> > rocksdb.
> >
> > Thanks,
> > Aitozi
> >
> >
> > 发件人: "David J. C. Beach" 
> > 日期: 2019年4月29日 星期一 上午6:43
> > 收件人: 
> > 主题: RocksDB backend with deferred writes?
> >
> > I have a stateful operator in a task which processes thousands of elements 
> > per second (per task) when using the Filesystem backend.  As documented and 
> > reported by other users, when I switch to the RocksDB backend, throughput 
> > is considerably lower.  I need something that combines the high performance 
> > of in-memory with the large state and incremental checkpointing of RocksDB.
> >
> > For my application, I can *almost* accomplish this by including a caching 
> > layer which maintains a map of pending (key, value) writes.  These are 
> > periodically flushed to the RocksDB (and always prior to a checkpoint).  
> > This greatly reduces the number of writes to RocksDB, and means that I can 
> > get a "best of both worlds" in terms of throughput and 
> > reliability/incremental checkpointing.  These optimizations make sense for 
> > my workload, since events which operate on the same key tend to be close 
> > together in the stream.  (Over the long haul, there are many millions of 
> > keys, and occasionally, an event references some key from the distant past, 
> > hence the need for RocksDB.)
> >
> > Unfortunately, this solution does not work perfectly because when I do 
> > eventually flush writes to the underlying RocksDB backend, the stateful 
> > processor may be operating on an element which belongs to a different key 
> > group.  Hence, the elements that I flush are associated with the wrong key 
> > group, and things don't work quite right.
> >
> > Is there any way to wrap the RocksDB backend with caching and deferred 
> > writes (in a way which is "key-group aware")?
> >
> > Thanks!
> >
> > David
> >


Re: RocksDB backend with deferred writes?

2019-04-28 Thread aitozi
Hi,David

 

Before I open an issue about this and @Andrey Zagrebin @Aljoscha Krettek 
suggested me to extends the AbstractStreamOperator to custom the operator 
operation on state or extends the statebackend to add a cache layer on it.

 

Fyi: 
https://issues.apache.org/jira/browse/FLINK-10343?focusedCommentId=16614992=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16614992

 

And a new stateBackend was introduced in Flink Forward China 2018 by alibaba , 
but it has not been open source , you can take a look on the slides.

 

https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf

 

Thanks,

Aitozi

 

发件人: "David J. C. Beach" 
日期: 2019年4月29日 星期一 上午11:52
收件人: aitozi 
抄送: 
主题: Re: RocksDB backend with deferred writes?

 

Thanks Aitozi.

 

Your answer makes good sense and I'm trying to implement this now.  My code is 
written as a KeyedProcessFunction, but I can't see where this exposes the 
KeyContext interface.  Is there anything you can point me to in the docs?

 

Best,

David

 

 

 

On Sun, Apr 28, 2019 at 8:09 PM aitozi  wrote:

Hi,David

 

RocksdbKeyedBackend is used under keyContext, every operation with state should 
setCurrentKey to let the rocksdb aware of the current key and complute the 
currrent keyGroup. Use these two parts to interactive with the underyling 
rocksdb. 

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or 
make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" 
日期: 2019年4月29日 星期一 上午6:43
收件人: 
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per 
second (per task) when using the Filesystem backend.  As documented and 
reported by other users, when I switch to the RocksDB backend, throughput is 
considerably lower.  I need something that combines the high performance of 
in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer 
which maintains a map of pending (key, value) writes.  These are periodically 
flushed to the RocksDB (and always prior to a checkpoint).  This greatly 
reduces the number of writes to RocksDB, and means that I can get a "best of 
both worlds" in terms of throughput and reliability/incremental checkpointing.  
These optimizations make sense for my workload, since events which operate on 
the same key tend to be close together in the stream.  (Over the long haul, 
there are many millions of keys, and occasionally, an event references some key 
from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do 
eventually flush writes to the underlying RocksDB backend, the stateful 
processor may be operating on an element which belongs to a different key 
group.  Hence, the elements that I flush are associated with the wrong key 
group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes 
(in a way which is "key-group aware")?

 

Thanks!

 

David

 



Re: RocksDB backend with deferred writes?

2019-04-28 Thread David J. C. Beach
Thanks Aitozi.

Your answer makes good sense and I'm trying to implement this now.  My code
is written as a KeyedProcessFunction, but I can't see where this exposes
the KeyContext interface.  Is there anything you can point me to in the
docs?

Best,
David



On Sun, Apr 28, 2019 at 8:09 PM aitozi  wrote:

> Hi,David
>
>
>
> RocksdbKeyedBackend is used under keyContext, every operation with state
> should setCurrentKey to let the rocksdb aware of the current key and
> complute the currrent keyGroup. Use these two parts to interactive with the
> underyling rocksdb.
>
>
>
> I think you can achieve this goal by setCurrentKey before flush to rocksdb
> or make the prefix key (keygroup + key) yourself put/get value to/from
> rocksdb.
>
>
>
> Thanks,
>
> Aitozi
>
>
>
>
>
> *发件人**: *"David J. C. Beach" 
> *日期**: *2019年4月29日 星期一 上午6:43
> *收件人**: *
> *主题**: *RocksDB backend with deferred writes?
>
>
>
> I have a stateful operator in a task which processes thousands of elements
> per second (per task) when using the Filesystem backend.  As documented and
> reported by other users, when I switch to the RocksDB backend, throughput
> is considerably lower.  I need something that combines the high performance
> of in-memory with the large state and incremental checkpointing of RocksDB.
>
>
>
> For my application, I can *almost* accomplish this by including a caching
> layer which maintains a map of pending (key, value) writes.  These are
> periodically flushed to the RocksDB (and always prior to a checkpoint).
> This greatly reduces the number of writes to RocksDB, and means that I can
> get a "best of both worlds" in terms of throughput and
> reliability/incremental checkpointing.  These optimizations make sense for
> my workload, since events which operate on the same key tend to be close
> together in the stream.  (Over the long haul, there are many millions of
> keys, and occasionally, an event references some key from the distant past,
> hence the need for RocksDB.)
>
>
>
> Unfortunately, this solution does not work perfectly because when I do
> eventually flush writes to the underlying RocksDB backend, the stateful
> processor may be operating on an element which belongs to a different key
> group.  Hence, the elements that I flush are associated with the wrong key
> group, and things don't work quite right.
>
>
>
> *Is there any way to wrap the RocksDB backend with caching and deferred
> writes (in a way which is "key-group aware")?*
>
>
>
> Thanks!
>
>
>
> David
>
>
>


Re: RocksDB backend with deferred writes?

2019-04-28 Thread aitozi
Hi,David

 

RocksdbKeyedBackend is used under keyContext, every operation with state should 
setCurrentKey to let the rocksdb aware of the current key and complute the 
currrent keyGroup. Use these two parts to interactive with the underyling 
rocksdb. 

 

I think you can achieve this goal by setCurrentKey before flush to rocksdb or 
make the prefix key (keygroup + key) yourself put/get value to/from rocksdb.

 

Thanks,

Aitozi

 

 

发件人: "David J. C. Beach" 
日期: 2019年4月29日 星期一 上午6:43
收件人: 
主题: RocksDB backend with deferred writes?

 

I have a stateful operator in a task which processes thousands of elements per 
second (per task) when using the Filesystem backend.  As documented and 
reported by other users, when I switch to the RocksDB backend, throughput is 
considerably lower.  I need something that combines the high performance of 
in-memory with the large state and incremental checkpointing of RocksDB.

 

For my application, I can *almost* accomplish this by including a caching layer 
which maintains a map of pending (key, value) writes.  These are periodically 
flushed to the RocksDB (and always prior to a checkpoint).  This greatly 
reduces the number of writes to RocksDB, and means that I can get a "best of 
both worlds" in terms of throughput and reliability/incremental checkpointing.  
These optimizations make sense for my workload, since events which operate on 
the same key tend to be close together in the stream.  (Over the long haul, 
there are many millions of keys, and occasionally, an event references some key 
from the distant past, hence the need for RocksDB.)

 

Unfortunately, this solution does not work perfectly because when I do 
eventually flush writes to the underlying RocksDB backend, the stateful 
processor may be operating on an element which belongs to a different key 
group.  Hence, the elements that I flush are associated with the wrong key 
group, and things don't work quite right.

 

Is there any way to wrap the RocksDB backend with caching and deferred writes 
(in a way which is "key-group aware")?

 

Thanks!

 

David

 



RocksDB backend with deferred writes?

2019-04-28 Thread David J. C. Beach
I have a stateful operator in a task which processes thousands of elements
per second (per task) when using the Filesystem backend.  As documented and
reported by other users, when I switch to the RocksDB backend, throughput
is considerably lower.  I need something that combines the high performance
of in-memory with the large state and incremental checkpointing of RocksDB.

For my application, I can *almost* accomplish this by including a caching
layer which maintains a map of pending (key, value) writes.  These are
periodically flushed to the RocksDB (and always prior to a checkpoint).
This greatly reduces the number of writes to RocksDB, and means that I can
get a "best of both worlds" in terms of throughput and
reliability/incremental checkpointing.  These optimizations make sense for
my workload, since events which operate on the same key tend to be close
together in the stream.  (Over the long haul, there are many millions of
keys, and occasionally, an event references some key from the distant past,
hence the need for RocksDB.)

Unfortunately, this solution does not work perfectly because when I do
eventually flush writes to the underlying RocksDB backend, the stateful
processor may be operating on an element which belongs to a different key
group.  Hence, the elements that I flush are associated with the wrong key
group, and things don't work quite right.

*Is there any way to wrap the RocksDB backend with caching and deferred
writes (in a way which is "key-group aware")?*

Thanks!

David