Hi Ron,

I’m joining two streams - one is a “decoration” stream that we have in a
> compacted Kafka topic, produced using a view on a MySQL table AND using
> Kafka Connect; the other is the “event data” we want to decorate, coming in
> over time via Kafka. These streams are keyed the same way - via an “id”
> field, and we join them using CoFlatMap that attaches the data from the
> decoration stream to the event data and publishes downstream.
>
> What I’d like to do in my CoFlatMap is wait to process the event data
> until we’ve received the corresponding decoration data. How do I make that
> happen?
>
>
I have exactly the same scenario and researched this. Basically, what we
need is
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API,
but unfortunately it seems like nobody actively working on it. You could
check ideas there though.

Side inputs seems to be implemented in Beam API and should be working on
Flink executor, but I didn’t try it for many reasons.

I have ended with following solution:

- in the script staring Job I create Stop file on shared filesystem (HDFS)
- created 2 SourceFunction extending Kafka source
- in source function for “decoration” stream in run method I consume all
records from compacted topic. Here is the tricky part how to identify if
everything is consumed already. I resolved it by reading kafka end offset
directly with kafka admin API and checking if I arrived at this offset.
After waiting a bit to make sure that event is propagated to next operator
I delete Stop file on the shared file system
- in source function for event data, I have implemented “open” method
waiting until Stop file is deleted. This keeps it consuming event data.
- in pipeline I broadcasted “decoration” event and used CoProcessFunction
to store it in state and enrich main event stream.

The application is not in production yet as I need to do more testing, but
it seems to work.

Additionally I tried to cache decorated data in state of source function to
recover from checkpoint easily, but I’m still not sure if it’s better to
read it from compacted topic every time or have additional cache in source
function or state in CoProcessFunction is enough.

Hope this helps and would be interested to hear your experience.

Regards,
Maxim.

Reply via email to