Hi,
I’m trying to configure exactly-once recovery for pipelines that are running on
top of flink with kafka sources and sinks.
I wrote two simple pipelines that pass messages from one topic to another (for
testing purposes):
pipeline1 reads messages from topic_a and writes them to topic_b
pipeline2 reads messages from topic_b and writes them to topic_c
I tried to configure exactly-once behavior in both pipelines by:
* execution.checkpointing.mode: EXACTLY_ONCE
* KafkaIO.read().withReadCommitted()
* KafkaIO.writeRecords().withEOS()
I started running both pipelines and then killed the task manager of pipeline1.
After it recovered, it started reading messages that were already processed
from topic_a. I assume the offset was determined by the last checkpoint. Then,
pipeline2 started receiving messages that were already processed from topic_b.
So in practice I got at-least-once behavior instead of exactly-once.
I noticed that on pipeline2, there were two consumers (even though there is
only one KafkaIO.read()): one named after my pipeline, and the other was
created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded
isolation.level=read_uncommitted.
I wrote similar pipelines in Flink and managed to achieve exactly-once behavior
using:
* execution.checkpointing.mode: EXACTLY_ONCE
* isolation.level: read_committed
* KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
* KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix")
I’m trying to figure out several things.
1. Why are there two consumers in the pipeline? Why is one of them
hard-coded with read_uncommitted? Is it the reason that pipeline2 gets
duplicate events after pipeline1 is recovered?
2. Is KafkaIO exactly-once implementation equivalent to flink KafkaSink
implementation?
3. Is there anything else that I missed? How to make it work in Beam?
Thanks,
Ifat