Hi Cristian,

I didn't try that, so I'm not 100% sure it would work, but you probably could try using custom timestamp policy for the KafkaIO, which will shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once you know you reached head of the state topic. That would probably require reading the end offsets before running the Pipeline. This should turn the source into bounded source effectively.

 Jan

[1] https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-

On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
Hi All,

I would like to know if there's a suggested pattern for the below scenario. TL;DR: reading state from Kafka.

I have a scenario where I'm listening to a kafka topic and generate a unique id based on the properties of the incoming item. Then, I output the result to another kafka topic. The tricky part is that when the pipeline is restarted, I have to read the output topic and build up the ids state, this way if I see an item that was already given an id, I give the same id back and do not generate a new one.

For example:
Input topic -> Output topic
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
(A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
pipeline is restarted
(A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <-- because we've already seen (A1, B1, C1) before

I can't really use any type of windows except the global ones, as I need to join on all the items of the output topic (the one with the already generated ids).

Right now, I flatten both input and output topics and I use a trigger on the global window AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10) then group by properties (A,B,C). Once that is done, I look through the grouped rows and see if any one of them has an id already generated. If yes, all the other rows get this id and the id is saved in the ParDo's state for the future messages. If no, then generate a new id.

My solution seems to work. Kind of...

This puts a delay of 10s on all the incoming messages. I'd prefer it wouldn't be the case. I would like to read the output topic at the start of the pipeline, build the state, then start processing the input topic. Since the output topic will be stale until I start processing the input topic again, it effectively is a bounded collection. Unfortunately because it's kafkaIO, it's still considered an unbounded source, which mainly means that Wait.on() this collection waits forever. (Note: I've read the notes in the documentation [1] but either do not understand them or didn't take the appropriate steps for wait.on to trigger properly.)

I have also tried to window the output topic in a session window with a one second gap. Basically, if I don't get any item for 1 second, it means that I finished reading the output topic and can start processing the input topic. Unfortunately Wait.on() doesn't work for Session Windows.

Furthermore, I don't think side inputs work for this problem. First because I'm not sure how to create the side input from an unbounded source. Second because the side input needs to be updated when a new id is generated.

I would appreciate any thoughts or ideas to elegantly solve this problem.

Thanks,
Cristian

[1] https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html <https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>

Reply via email to