Hi all,

I have a question that appears as a user@ question, but brought me into the dev@ mailing list while I was browsing through the Flink's source codes. First I'll try to briefly describe my use case. I'm trying to do a group-by-key operation with a limited number of distinct keys (which I cannot control), but a non trivial count of values. The operation in the GBK is non-combining, so that all values per key (many) have to be stored in a state. Running this on testing data led to a surprise (for me), that even when using RocksDBStateBackend, the whole list of data is serialized into single binary blob and then deserialized into List, and therefore has to fit in memory (multiple times, in fact).

I tried to create an alternative RocksDBStateBackend, that would store each element of list in ListState to a separate key in RocksDB, so that the whole blob would not have to be loaded by a single get, but a scan over multiple keys could be made. Digging into the source code I found there was a hierarchy of classes mirroring the public API in 'internal' package - InternalKvState, InternalMergingState, InternalListState, and so on. These classes however have different hierarchy than the public API classes that they mirror, most notably InternalKvState is superinterface of all others. This fact seems to be used on multiple places throughout the source code.

My question is - is this intentional? Would it be possible to store each element of a ListState in a separate key in RocksDB (probably by adding some suffix to the actual key of the state for each element)? What are the pitfalls? And is it necessary for the InternalListState to be actually subinterface of InternalKvState? I find this to be a related problem.

Many thanks for any comments or thoughts,

 Jan

Reply via email to