Thanks Dongwon. It was extremely helpful. I didn't quite understand how
async io can be used here. It would be great if you can share some info on
it.

Also how are you propagating any changes to values?

Regards,
Navneeth

On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
> better open a pipeline with a 'transaction' property set to False [1].
>
> Otherwise, API calls from your Flink job will be timeout.
>
> [1] https://github.com/andymccurdy/redis-py#pipelines
>
> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
>> Hi Navneeth,
>>
>> I reported a similar issue to yours before [1] but I took the
>> broadcasting approach at first.
>>
>> As you already anticipated, broadcasting is going to use more memory than
>> your current approach based on a static object on each TM .
>>
>> And the broadcasted data will be treated as operator state and will be
>> periodically checkpointed with serialization overhead & garbage collections.
>> These are not negligible at all if you're not carefully choosing
>> serialization strategy as explained in [2].
>> Even with the proper one, I've experienced mild back pressure whenever
>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
>> do with operator states)
>> - cache is being broadcasted
>>
>> For that reason, I decided to populate data on Redis but it also calls
>> for design decisions:
>> - which Java client to use? Jedis [3]? Lettuce [4]?
>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>
>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>> very small memory footprint and without worrying about serialization
>> overhead and garbage collections.
>> Lettuce supports asynchronous communication so it works perfectly with
>> Flink's async io.
>> I bet you'll be very disappointed with invoking Jedis synchronously
>> inside ProcessFunction.
>>
>> Best,
>>
>> Dongwon
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>> [2]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> [3] https://github.com/redis/jedis
>> [4] https://lettuce.io/
>> [5]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>
>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We have a flink streaming job processing around 200k events per second.
>>> The job requires a lot of less frequently changing data (sort of static but
>>> there will be some changes over time, say 5% change once per day or so).
>>> There are about 12 caches with some containing approximately 20k
>>> entries whereas a few with about 2 million entries.
>>>
>>> In the current implementation we are using in-memory lazy loading static
>>> cache to populate the data and the initialization happens in open function.
>>> The reason to choose this approach is because we have allocated around 4GB
>>> extra memory per TM for these caches and if a TM has 6 slots the cache can
>>> be shared.
>>>
>>> Now the issue we have with this approach is everytime when a container
>>> is restarted or a new job is deployed it has to populate the cache again.
>>> Sometimes this lazy loading takes a while and it causes back pressure as
>>> well. We were thinking to move this logic to the broadcast stream but since
>>> the data has to be stored per slot it would increase the memory consumption
>>> by a lot.
>>>
>>> Another option that we were thinking of is to replace the current near
>>> far cache that uses rest api to load the data to redis based near far
>>> cache. This will definitely reduce the overall loading time but still not
>>> the perfect solution.
>>>
>>> Are there any recommendations on how this can be achieved effectively?
>>> Also how is everyone overcoming this problem?
>>>
>>> Thanks,
>>> Navneeth
>>>
>>>

Reply via email to