Hi Lei,

There is an additional overhead when adding new keys to an operator, since
Flink needs to maintain the state, timers etc for the individual keys.
If you are interested in more details, I suggest to use the FlinkUI and
compare the flamegraph for the stages. There you can see the difference
between the 2 versions and identify the exact method call causing the
difference.
This flamegraph could help identifying other bottlenecks too.

I hope this helps,
Peter

On Thu, Apr 11, 2024, 08:42 Lei Wang <leiwang...@gmail.com> wrote:

> Hi Peter,
>
> I tried,this improved performance significantly,but i don't know exactly
> why.
> According to what i know, the number of keys in RocksDB doesn't decrease.
>
> Any specific technical material about this?
>
> Thanks,
> Lei
>
>
> On Fri, Mar 29, 2024 at 9:49 PM Lei Wang <leiwang...@gmail.com> wrote:
>
>> Perhaps I can  keyBy(Hash(originalKey) % 100000)
>> Then in the KeyProcessOperator using MapState instead of ValueState
>>   MapState<OriginalKey, Boolean>  mapState
>>
>> There's about  100000 OriginalKey for each mapState
>>
>> Hope this will help
>>
>> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Hi Lei,
>>>
>>> Have you tried to make the key smaller, and store a list of found keys
>>> as a value?
>>>
>>> Let's make the operator key a hash of your original key, and store a
>>> list of the full keys in the state. You can play with your hash length to
>>> achieve the optimal number of keys.
>>>
>>> I hope this helps,
>>> Peter
>>>
>>> On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote:
>>>
>>>>
>>>> Use RocksDBBackend to store whether the element appeared within the
>>>> last one day,  here is the code:
>>>>
>>>> *public class DedupFunction extends KeyedProcessFunction<Long, IN,OUT>
>>>> {*
>>>>
>>>> *    private ValueState<Boolean> isExist;*
>>>>
>>>> *    public void open(Configuration parameters) throws Exception {*
>>>> *        ValueStateDescriptor<boolean> desc = new ........*
>>>> *        StateTtlConfig ttlConfig =
>>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType......*
>>>> *        desc.enableTimeToLive(ttlConfig);*
>>>> *        isExist = getRuntimeContext().getState(desc);*
>>>> *    }*
>>>>
>>>> *    public void processElement(IN in, .... ) {*
>>>> *        if(null == isExist.value()) {*
>>>> *            out.collect(in)*
>>>> *            isExist.update(true)*
>>>> *        } *
>>>> *    }*
>>>> *}*
>>>>
>>>> Because the number of distinct key is too large(about 10 billion one
>>>> day ), there's performance bottleneck for this operator.
>>>> How can I optimize the performance?
>>>>
>>>> Thanks,
>>>> Lei
>>>>
>>>>
>>>

Reply via email to