Hello,

You’re right, one of our main use cases consist of adding missing fields, 
stored in a “small” reference table, periodically refreshed, to a stream. Using 
a broadcast stream and flink join was not the choice we made, because we didn’t 
want to add tricky watermarks and hold one stream (it may build a huge state 
using a window, and you don’t always have control on the source function to 
wait before emitting) until everything is broadcasted.

So, we developed tools that load a static RAM hashmap cache from the reference 
table in the open() method of our enrichment operator, without using flink 
streams, and launch a thread to periodically refresh the hashmap. We also use 
the same hashing mechanism as flink to load on each task manager only the part 
of the table which is relevant to the keyed stream.

IMHO this stuff should be part of the framework, it‘s easier to do with Spark 
Streaming… :-)

Best regards,
Arnaud

De : Lars Skjærven <lar...@gmail.com>
Envoyé : mercredi 14 février 2024 08:12
À : user <user@flink.apache.org>
Objet : Stream enrichment with ingest mode

Dear all,

A reoccurring challenge we have with stream enrichment in Flink is a robust 
mechanism to estimate that all messages of the source(s) have been 
consumed/processed before output is collected.

A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,

For a process function to enrich the categories with the number of products in 
each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed 
by category ID, and put both the category and products in state. Then count all 
products for each keyed state and collect the result.

Typically, however, we don't want to start counting before all products are 
included in state (to avoid emitting incomplete aggregations downstream). 
Therefore we use the event lag time (i.e. processing time - current watermark) 
to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When 
in "ingest mode" we will trigger a timer, and return without collecting. 
Finally, the timer fires when the watermark has advanced sufficiently.

This strategy of "ingest mode" (and timers) seems to be more complicated when 
you have multiple process functions (with the same need of ingest mode) 
downstream of the first one processor. The reason seems to be that watermarks 
are passed from the first process function even though no elements are 
collected. Therefore, when elements finally arrive at the second process 
function, the current watermark has already advanced, so the same strategy of 
watermarks is less robust.

I'm curious how others in the community handle this "challenge" of initial 
ingest. Any ideas are greatly appreciated.

Note: we use a custom watermark generator that emits watermarks derived from 
event time, and advances the watermarks when the source is idle for a longer 
period (e.g. 30 seconds).

Thanks !

L




________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.

Reply via email to