Hi All,

I would like to know if there's a suggested pattern for the below scenario.
TL;DR: reading state from Kafka.

I have a scenario where I'm listening to a kafka topic and generate a
unique id based on the properties of the incoming item. Then, I output the
result to another kafka topic. The tricky part is that when the pipeline is
restarted, I have to read the output topic and build up the ids state, this
way if I see an item that was already given an id, I give the same id back
and do not generate a new one.

For example:
Input topic -> Output topic
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
(A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
pipeline is restarted
(A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")  <-- because we've
already seen (A1, B1, C1) before

I can't really use any type of windows except the global ones, as I need to
join on all the items of the output topic (the one with the already
generated ids).

Right now, I flatten both input and output topics and I use a trigger on
the global window
AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10)
then group by properties (A,B,C). Once that is done, I look through the
grouped rows and see if any one of them has an id already generated. If
yes, all the other rows get this id and the id is saved in the ParDo's
state for the future messages. If no, then generate a new id.

My solution seems to work. Kind of...

This puts a delay of 10s on all the incoming messages. I'd prefer it
wouldn't be the case. I would like to read the output topic at the start of
the pipeline, build the state, then start processing the input topic. Since
the output topic will be stale until I start processing the input topic
again, it effectively is a bounded collection. Unfortunately because it's
kafkaIO, it's still considered an unbounded source, which mainly means that
Wait.on() this collection waits forever. (Note: I've read the notes in the
documentation [1] but either do not understand them or didn't take the
appropriate steps for wait.on to trigger properly.)

I have also tried to window the output topic in a session window with a one
second gap. Basically, if I don't get any item for 1 second, it means that
I finished reading the output topic and can start processing the input
topic. Unfortunately Wait.on() doesn't work for Session Windows.

Furthermore, I don't think side inputs work for this problem. First because
I'm not sure how to create the side input from an unbounded source. Second
because the side input needs to be updated when a new id is generated.

I would appreciate any thoughts or ideas to elegantly solve this problem.

Thanks,
Cristian

[1]
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html

Reply via email to