Hi Jan, You could associate a key to each element of your Key's list (e.g., hashing the value), keep only the keys in heap (e.g., in a list) and the associated state key-value/s in an external store like RocksDB/Redis, but you will notice large overheads due to de/serializing - a huge penatly for more than 1000s of elements (see https://hal.inria.fr/hal-01530744/document <https://hal.inria.fr/hal-01530744/document> for some experimental settings) for relatively small rate of new events per Key, if needed to process all values of a Key for each new event. Best case you can do some incremental processing unless your non-combining means non-associative operations per Key.
Best, Ovidiu > On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Fabian, > > thanks for quick reply, what you suggest seems to work at first sight, I will > try it. Is there any reason not to implement a RocksDBListState this way in > general? Is there any increased overhead of this approach? > > Thanks, > > Jan > > > On 12/12/2017 11:17 AM, Fabian Hueske wrote: >> Hi Jan, >> >> I cannot comment on the internal design, but you could put the data into a >> RocksDBStateBackend MapState<Integer, X> where the value X is your data >> type and the key is the list index. You would need another ValueState for >> the current number of elements that you put into the MapState. >> A MapState allows to fetch and traverse the key, value, or entry set of the >> Map without loading it completely into memory. >> The sets are traversed in sort order of the key, so should be in insertion >> order (given that you properly increment the list index). >> >> Best, Fabian >> >> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>: >> >>> Hi all, >>> >>> I have a question that appears as a user@ question, but brought me into >>> the dev@ mailing list while I was browsing through the Flink's source >>> codes. First I'll try to briefly describe my use case. I'm trying to do a >>> group-by-key operation with a limited number of distinct keys (which I >>> cannot control), but a non trivial count of values. The operation in the >>> GBK is non-combining, so that all values per key (many) have to be stored >>> in a state. Running this on testing data led to a surprise (for me), that >>> even when using RocksDBStateBackend, the whole list of data is serialized >>> into single binary blob and then deserialized into List, and therefore has >>> to fit in memory (multiple times, in fact). >>> >>> I tried to create an alternative RocksDBStateBackend, that would store >>> each element of list in ListState to a separate key in RocksDB, so that the >>> whole blob would not have to be loaded by a single get, but a scan over >>> multiple keys could be made. Digging into the source code I found there was >>> a hierarchy of classes mirroring the public API in 'internal' package - >>> InternalKvState, InternalMergingState, InternalListState, and so on. These >>> classes however have different hierarchy than the public API classes that >>> they mirror, most notably InternalKvState is superinterface of all others. >>> This fact seems to be used on multiple places throughout the source code. >>> >>> My question is - is this intentional? Would it be possible to store each >>> element of a ListState in a separate key in RocksDB (probably by adding >>> some suffix to the actual key of the state for each element)? What are the >>> pitfalls? And is it necessary for the InternalListState to be actually >>> subinterface of InternalKvState? I find this to be a related problem. >>> >>> Many thanks for any comments or thoughts, >>> >>> Jan >>> >>> >