Hi, I’m trying to configure exactly-once recovery for pipelines that are running on top of flink with kafka sources and sinks.
I wrote two simple pipelines that pass messages from one topic to another (for testing purposes): pipeline1 reads messages from topic_a and writes them to topic_b pipeline2 reads messages from topic_b and writes them to topic_c I tried to configure exactly-once behavior in both pipelines by: * execution.checkpointing.mode: EXACTLY_ONCE * KafkaIO.read().withReadCommitted() * KafkaIO.writeRecords().withEOS() I started running both pipelines and then killed the task manager of pipeline1. After it recovered, it started reading messages that were already processed from topic_a. I assume the offset was determined by the last checkpoint. Then, pipeline2 started receiving messages that were already processed from topic_b. So in practice I got at-least-once behavior instead of exactly-once. I noticed that on pipeline2, there were two consumers (even though there is only one KafkaIO.read()): one named after my pipeline, and the other was created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded isolation.level=read_uncommitted. I wrote similar pipelines in Flink and managed to achieve exactly-once behavior using: * execution.checkpointing.mode: EXACTLY_ONCE * isolation.level: read_committed * KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) * KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix") I’m trying to figure out several things. 1. Why are there two consumers in the pipeline? Why is one of them hard-coded with read_uncommitted? Is it the reason that pipeline2 gets duplicate events after pipeline1 is recovered? 2. Is KafkaIO exactly-once implementation equivalent to flink KafkaSink implementation? 3. Is there anything else that I missed? How to make it work in Beam? Thanks, Ifat