Hi Ron, I’m joining two streams - one is a “decoration” stream that we have in a > compacted Kafka topic, produced using a view on a MySQL table AND using > Kafka Connect; the other is the “event data” we want to decorate, coming in > over time via Kafka. These streams are keyed the same way - via an “id” > field, and we join them using CoFlatMap that attaches the data from the > decoration stream to the event data and publishes downstream. > > What I’d like to do in my CoFlatMap is wait to process the event data > until we’ve received the corresponding decoration data. How do I make that > happen? > > I have exactly the same scenario and researched this. Basically, what we need is https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API, but unfortunately it seems like nobody actively working on it. You could check ideas there though.
Side inputs seems to be implemented in Beam API and should be working on Flink executor, but I didn’t try it for many reasons. I have ended with following solution: - in the script staring Job I create Stop file on shared filesystem (HDFS) - created 2 SourceFunction extending Kafka source - in source function for “decoration” stream in run method I consume all records from compacted topic. Here is the tricky part how to identify if everything is consumed already. I resolved it by reading kafka end offset directly with kafka admin API and checking if I arrived at this offset. After waiting a bit to make sure that event is propagated to next operator I delete Stop file on the shared file system - in source function for event data, I have implemented “open” method waiting until Stop file is deleted. This keeps it consuming event data. - in pipeline I broadcasted “decoration” event and used CoProcessFunction to store it in state and enrich main event stream. The application is not in production yet as I need to do more testing, but it seems to work. Additionally I tried to cache decorated data in state of source function to recover from checkpoint easily, but I’m still not sure if it’s better to read it from compacted topic every time or have additional cache in source function or state in CoProcessFunction is enough. Hope this helps and would be interested to hear your experience. Regards, Maxim.