Hi all,
I have a question that appears as a user@ question, but brought me into
the dev@ mailing list while I was browsing through the Flink's source
codes. First I'll try to briefly describe my use case. I'm trying to do
a group-by-key operation with a limited number of distinct keys (which I
cannot control), but a non trivial count of values. The operation in the
GBK is non-combining, so that all values per key (many) have to be
stored in a state. Running this on testing data led to a surprise (for
me), that even when using RocksDBStateBackend, the whole list of data is
serialized into single binary blob and then deserialized into List, and
therefore has to fit in memory (multiple times, in fact).
I tried to create an alternative RocksDBStateBackend, that would store
each element of list in ListState to a separate key in RocksDB, so that
the whole blob would not have to be loaded by a single get, but a scan
over multiple keys could be made. Digging into the source code I found
there was a hierarchy of classes mirroring the public API in 'internal'
package - InternalKvState, InternalMergingState, InternalListState, and
so on. These classes however have different hierarchy than the public
API classes that they mirror, most notably InternalKvState is
superinterface of all others. This fact seems to be used on multiple
places throughout the source code.
My question is - is this intentional? Would it be possible to store each
element of a ListState in a separate key in RocksDB (probably by adding
some suffix to the actual key of the state for each element)? What are
the pitfalls? And is it necessary for the InternalListState to be
actually subinterface of InternalKvState? I find this to be a related
problem.
Many thanks for any comments or thoughts,
Jan