Hello,
You’re right, one of our main use cases consist of adding missing fields,
stored in a “small” reference table, periodically refreshed, to a stream. Using
a broadcast stream and flink join was not the choice we made, because we didn’t
want to add tricky watermarks and hold one stream (it may build a huge state
using a window, and you don’t always have control on the source function to
wait before emitting) until everything is broadcasted.
So, we developed tools that load a static RAM hashmap cache from the reference
table in the open() method of our enrichment operator, without using flink
streams, and launch a thread to periodically refresh the hashmap. We also use
the same hashing mechanism as flink to load on each task manager only the part
of the table which is relevant to the keyed stream.
IMHO this stuff should be part of the framework, it‘s easier to do with Spark
Streaming… :-)
Best regards,
Arnaud
De : Lars Skjærven
Envoyé : mercredi 14 février 2024 08:12
À : user
Objet : Stream enrichment with ingest mode
Dear all,
A reoccurring challenge we have with stream enrichment in Flink is a robust
mechanism to estimate that all messages of the source(s) have been
consumed/processed before output is collected.
A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,
For a process function to enrich the categories with the number of products in
each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed
by category ID, and put both the category and products in state. Then count all
products for each keyed state and collect the result.
Typically, however, we don't want to start counting before all products are
included in state (to avoid emitting incomplete aggregations downstream).
Therefore we use the event lag time (i.e. processing time - current watermark)
to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When
in "ingest mode" we will trigger a timer, and return without collecting.
Finally, the timer fires when the watermark has advanced sufficiently.
This strategy of "ingest mode" (and timers) seems to be more complicated when
you have multiple process functions (with the same need of ingest mode)
downstream of the first one processor. The reason seems to be that watermarks
are passed from the first process function even though no elements are
collected. Therefore, when elements finally arrive at the second process
function, the current watermark has already advanced, so the same strategy of
watermarks is less robust.
I'm curious how others in the community handle this "challenge" of initial
ingest. Any ideas are greatly appreciated.
Note: we use a custom watermark generator that emits watermarks derived from
event time, and advances the watermarks when the source is idle for a longer
period (e.g. 30 seconds).
Thanks !
L
L'intégrité de ce message n'étant pas assurée sur internet, la société
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
l'expéditeur.
The integrity of this message cannot be guaranteed on the Internet. The company
that sent this message cannot therefore be held liable for its content nor
attachments. Any unauthorized use or dissemination is prohibited. If you are
not the intended recipient of this message, then please delete it and notify
the sender.