Hi Aljoscha,

thanks for reply. Do you see any issues in implementing the list state the way Fabian suggested (i.e. using the MapState)? I feel there are some open questions, mostly because the InternalListState (which I suppose the RocksDBListState should implement) extends InternalKvState, which in turn suggests, that the correct implementation *should* behave exactly as the current implementation does - serialize the list into one field and store as key-value. Do you think there would be any major issues with this?

Many thanks,

 Jan


On 12/13/2017 02:22 PM, Aljoscha Krettek wrote:
Hi,

If I remember correctly, there was actually an effort to change the RocksDB list 
state the way you described. I'm cc'ing Stephan, who was involved in that and this is 
the Jira issue: https://issues.apache.org/jira/browse/FLINK-5756 
<https://issues.apache.org/jira/browse/FLINK-5756>

Best,
Aljoscha

On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU 
<ovidiu-cristian.ma...@inria.fr> wrote:

Hi Jan,

You could associate a key to each element of your Key's list (e.g., hashing the 
value), keep only the keys in heap (e.g., in a list) and the associated state 
key-value/s in an external store like RocksDB/Redis, but you will notice large 
overheads due to de/serializing - a huge penatly for more than 1000s of elements (see 
https://hal.inria.fr/hal-01530744/document 
<https://hal.inria.fr/hal-01530744/document> for some experimental settings) 
for relatively small rate of new events per Key, if needed to process all values of a 
Key for each new event. Best case you can do some incremental processing unless your 
non-combining means non-associative operations per Key.

Best,
Ovidiu
On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:

Hi Fabian,

thanks for quick reply, what you suggest seems to work at first sight, I will 
try it. Is there any reason not to implement a RocksDBListState this way in 
general? Is there any increased overhead of this approach?

Thanks,

Jan


On 12/12/2017 11:17 AM, Fabian Hueske wrote:
Hi Jan,

I cannot comment on the internal design, but you could put the data into a
RocksDBStateBackend MapState<Integer, X> where the value X is your data
type and the key is the list index. You would need another ValueState for
the current number of elements that you put into the MapState.
A MapState allows to fetch and traverse the key, value, or entry set of the
Map without loading it completely into memory.
The sets are traversed in sort order of the key, so should be in insertion
order (given that you properly increment the list index).

Best, Fabian

2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:

Hi all,

I have a question that appears as a user@ question, but brought me into
the dev@ mailing list while I was browsing through the Flink's source
codes. First I'll try to briefly describe my use case. I'm trying to do a
group-by-key operation with a limited number of distinct keys (which I
cannot control), but a non trivial count of values. The operation in the
GBK is non-combining, so that all values per key (many) have to be stored
in a state. Running this on testing data led to a surprise (for me), that
even when using RocksDBStateBackend, the whole list of data is serialized
into single binary blob and then deserialized into List, and therefore has
to fit in memory (multiple times, in fact).

I tried to create an alternative RocksDBStateBackend, that would store
each element of list in ListState to a separate key in RocksDB, so that the
whole blob would not have to be loaded by a single get, but a scan over
multiple keys could be made. Digging into the source code I found there was
a hierarchy of classes mirroring the public API in 'internal' package -
InternalKvState, InternalMergingState, InternalListState, and so on. These
classes however have different hierarchy than the public API classes that
they mirror, most notably InternalKvState is superinterface of all others.
This fact seems to be used on multiple places throughout the source code.

My question is - is this intentional? Would it be possible to store each
element of a ListState in a separate key in RocksDB (probably by adding
some suffix to the actual key of the state for each element)? What are the
pitfalls? And is it necessary for the InternalListState to be actually
subinterface of InternalKvState? I find this to be a related problem.

Many thanks for any comments or thoughts,

Jan




Reply via email to