Re: Redis as a State Backend

2024-02-14 Thread David Morávek
Here is my "little harsh/straightforward feedback", but it's based on fact
and real-world experience with using Redis since ~2012.

Redis is not a database, period. The best description of what Redis is is
something along the lines of "in-memory - text only (base64 ftw) - data
structures on top of TCP socket". The sweet spot for Redis is the in-memory
caching layer (Memcache is the closest equivalent). The misconceptions
around Redis have haunted me for the past decade.

Redis is not providing any good primitives for participating in the
checkpointing / fault-tolerance mechanism, beyond what can be implemented
natively in the heap-state backend. It can do a full snapshot of a database
(costly, we need incremental, ...) or a text-based append log (changelog
state backend).

All data needs to fit in memory. In text form. No compression (you can of
course compress before doing base64).

We should not even bother comparing it to RocksDB, its Flink equivalent is
the HeapStateBackend, which could be made way more performant because it
eliminates the need of going through the network stack.

Best,
D.

On Wed, Jan 31, 2024 at 5:27 PM David Anderson  wrote:

> When it comes to decoupling the state store from Flink, I suggest taking a
> look at FlinkNDB, which is an experimental state backend for Flink that
> puts the state into an external distributed database. There's a Flink
> Forward talk [1] and a master's thesis [2] available.
>
> [1] https://www.youtube.com/watch?v=ZWq_TzsXssM
> [2] http://www.diva-portal.org/smash/get/diva2:1536373/FULLTEXT01.pdf
>
>
>
>
>
>
> On Wed, Jan 31, 2024 at 12:30 AM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
>> Thanks Zakelly and Junrui.
>>
>> I was actually exploring RocksDB as a state backend and I thought maybe
>> Redis could offer more features as a state backend. For e.g. maybe state
>> sharing between operators, geo-red of state, partitioning etc. I understand
>> these are not native use cases for Flink, but maybe something that can be
>> considered in future. Maybe even as an off the shelf state backend
>> framework which allows embedding any other cache as a state backend.
>>
>> The links you shared are useful and will really help me. Really
>> appreciate it.
>>
>> Thanks
>>
>> On Tuesday, 30 January, 2024 at 01:43:14 pm IST, Zakelly Lan <
>> zakelly@gmail.com> wrote:
>>
>>
>> And I found some previous discussion, FYI:
>> 1. https://issues.apache.org/jira/browse/FLINK-3035
>> 2. https://www.mail-archive.com/dev@flink.apache.org/msg10666.html
>>
>> Hope this helps.
>>
>> Best,
>> Zakelly
>>
>> On Tue, Jan 30, 2024 at 4:08 PM Zakelly Lan 
>> wrote:
>>
>> Hi Chirag
>>
>> That's an interesting idea. IIUC, storing key-values can be simply
>> implemented for Redis, but supporting checkpoint and recovery is relatively
>> challenging. Flink's checkpoint should be consistent among all stateful
>> operators at the same time. For an *embedded* and *file-based* key value
>> store like RocksDB, it is easier to implement by uploading files of
>> specific time asynchronously.
>>
>> Moreover if you want to store your state basically in memory, then why
>> not using the HashMapStateBackend. It saves the overhead of serialization
>> and deserialization and may achieve better performance compared with Redis
>> I guess.
>>
>>
>> Best,
>> Zakelly
>>
>> On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user <
>> user@flink.apache.org> wrote:
>>
>> Hi,
>>
>> I was looking at the FLIP-254: Redis Streams Connector and I was
>> wondering if Flink ever considered Redis as a state backend? And if yes,
>> why was it discarded compared to RocksDB?
>>
>> If someone can point me towards any deep dives on why RocksDB is a better
>> fit as a state backend, it would be helpful.
>>
>> Thanks,
>> Chirag
>>
>>


RE: Stream enrichment with ingest mode

2024-02-14 Thread LINZ, Arnaud
Hello,

You’re right, one of our main use cases consist of adding missing fields, 
stored in a “small” reference table, periodically refreshed, to a stream. Using 
a broadcast stream and flink join was not the choice we made, because we didn’t 
want to add tricky watermarks and hold one stream (it may build a huge state 
using a window, and you don’t always have control on the source function to 
wait before emitting) until everything is broadcasted.

So, we developed tools that load a static RAM hashmap cache from the reference 
table in the open() method of our enrichment operator, without using flink 
streams, and launch a thread to periodically refresh the hashmap. We also use 
the same hashing mechanism as flink to load on each task manager only the part 
of the table which is relevant to the keyed stream.

IMHO this stuff should be part of the framework, it‘s easier to do with Spark 
Streaming… :-)

Best regards,
Arnaud

De : Lars Skjærven 
Envoyé : mercredi 14 février 2024 08:12
À : user 
Objet : Stream enrichment with ingest mode

Dear all,

A reoccurring challenge we have with stream enrichment in Flink is a robust 
mechanism to estimate that all messages of the source(s) have been 
consumed/processed before output is collected.

A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,

For a process function to enrich the categories with the number of products in 
each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed 
by category ID, and put both the category and products in state. Then count all 
products for each keyed state and collect the result.

Typically, however, we don't want to start counting before all products are 
included in state (to avoid emitting incomplete aggregations downstream). 
Therefore we use the event lag time (i.e. processing time - current watermark) 
to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When 
in "ingest mode" we will trigger a timer, and return without collecting. 
Finally, the timer fires when the watermark has advanced sufficiently.

This strategy of "ingest mode" (and timers) seems to be more complicated when 
you have multiple process functions (with the same need of ingest mode) 
downstream of the first one processor. The reason seems to be that watermarks 
are passed from the first process function even though no elements are 
collected. Therefore, when elements finally arrive at the second process 
function, the current watermark has already advanced, so the same strategy of 
watermarks is less robust.

I'm curious how others in the community handle this "challenge" of initial 
ingest. Any ideas are greatly appreciated.

Note: we use a custom watermark generator that emits watermarks derived from 
event time, and advances the watermarks when the source is idle for a longer 
period (e.g. 30 seconds).

Thanks !

L






L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.