Thanks a lot!

On Tue, Dec 19, 2017 at 11:08 PM, Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I filled a JIRA issue and pushed a PR for this.
>
> https://issues.apache.org/jira/browse/FLINK-8297
>
> Best,
>
>  Jan
>
>
> On 12/14/2017 11:13 AM, Stephan Ewen wrote:
>
>> Hi Jan!
>>
>> One could implement the RocksDB ListState like you suggested.
>>
>> We did it the current way because that pattern is actually quite efficient
>> if you list fits into memory - The list append is constant and the list
>> access is the first time the values are concatenated. Especially for
>> typical windowing patterns (frequent append(), occasional get()) this
>> works
>> quite well.
>>
>> It falls short when the lists get too large, that is correct. To break it
>> into individual elements means to have a range iterator for list.get()
>> access which I think is a bit more costly. It also needs a nifty way to
>> add
>> a 'position' number into the key to make sure the list remains ordered,
>> and
>> to not have to have extra read-modify-write state every time this number
>> is
>> updated.
>>
>> But all in all, it should be possible. Are you interested in working on
>> something like that and contributing it?
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>> Hi,
>>>
>>> If I remember correctly, there was actually an effort to change the
>>> RocksDB list state the way you described. I'm cc'ing Stephan, who was
>>> involved in that and this is the Jira issue:
>>> https://issues.apache.org/jira/browse/FLINK-5756 <
>>> https://issues.apache.org/jira/browse/FLINK-5756>
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
>>>>
>>> ovidiu-cristian.ma...@inria.fr> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> You could associate a key to each element of your Key's list (e.g.,
>>>>
>>> hashing the value), keep only the keys in heap (e.g., in a list) and the
>>> associated state key-value/s in an external store like RocksDB/Redis, but
>>> you will notice large overheads due to de/serializing - a huge penatly
>>> for
>>> more than 1000s of elements (see https://hal.inria.fr/hal-01530
>>> 744/document <https://hal.inria.fr/hal-01530744/document> for some
>>> experimental settings) for relatively small rate of new events per Key,
>>> if
>>> needed to process all values of a Key for each new event. Best case you
>>> can
>>> do some incremental processing unless your non-combining means
>>> non-associative operations per Key.
>>>
>>>> Best,
>>>> Ovidiu
>>>>
>>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> thanks for quick reply, what you suggest seems to work at first sight,
>>>>>
>>>> I will try it. Is there any reason not to implement a RocksDBListState
>>> this
>>> way in general? Is there any increased overhead of this approach?
>>>
>>>> Thanks,
>>>>>
>>>>> Jan
>>>>>
>>>>>
>>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>>
>>>>>> Hi Jan,
>>>>>>
>>>>>> I cannot comment on the internal design, but you could put the data
>>>>>>
>>>>> into a
>>>
>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>>> type and the key is the list index. You would need another ValueState
>>>>>>
>>>>> for
>>>
>>>> the current number of elements that you put into the MapState.
>>>>>> A MapState allows to fetch and traverse the key, value, or entry set
>>>>>>
>>>>> of the
>>>
>>>> Map without loading it completely into memory.
>>>>>> The sets are traversed in sort order of the key, so should be in
>>>>>>
>>>>> insertion
>>>
>>>> order (given that you properly increment the list index).
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>>>
>>>>>> Hi all,
>>>>>>>
>>>>>>> I have a question that appears as a user@ question, but brought me
>>>>>>>
>>>>>> into
>>>
>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to
>>>>>>>
>>>>>> do a
>>>
>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>>> cannot control), but a non trivial count of values. The operation in
>>>>>>>
>>>>>> the
>>>
>>>> GBK is non-combining, so that all values per key (many) have to be
>>>>>>>
>>>>>> stored
>>>
>>>> in a state. Running this on testing data led to a surprise (for me),
>>>>>>>
>>>>>> that
>>>
>>>> even when using RocksDBStateBackend, the whole list of data is
>>>>>>>
>>>>>> serialized
>>>
>>>> into single binary blob and then deserialized into List, and
>>>>>>>
>>>>>> therefore has
>>>
>>>> to fit in memory (multiple times, in fact).
>>>>>>>
>>>>>>> I tried to create an alternative RocksDBStateBackend, that would
>>>>>>> store
>>>>>>> each element of list in ListState to a separate key in RocksDB, so
>>>>>>>
>>>>>> that the
>>>
>>>> whole blob would not have to be loaded by a single get, but a scan
>>>>>>>
>>>>>> over
>>>
>>>> multiple keys could be made. Digging into the source code I found
>>>>>>>
>>>>>> there was
>>>
>>>> a hierarchy of classes mirroring the public API in 'internal' package
>>>>>>>
>>>>>> -
>>>
>>>> InternalKvState, InternalMergingState, InternalListState, and so on.
>>>>>>>
>>>>>> These
>>>
>>>> classes however have different hierarchy than the public API classes
>>>>>>>
>>>>>> that
>>>
>>>> they mirror, most notably InternalKvState is superinterface of all
>>>>>>>
>>>>>> others.
>>>
>>>> This fact seems to be used on multiple places throughout the source
>>>>>>>
>>>>>> code.
>>>
>>>> My question is - is this intentional? Would it be possible to store
>>>>>>>
>>>>>> each
>>>
>>>> element of a ListState in a separate key in RocksDB (probably by
>>>>>>>
>>>>>> adding
>>>
>>>> some suffix to the actual key of the state for each element)? What
>>>>>>>
>>>>>> are the
>>>
>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>>>
>>>>>>> Many thanks for any comments or thoughts,
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>

Reply via email to