Hi,

I’m trying to configure exactly-once recovery for pipelines that are running on 
top of flink with kafka sources and sinks.

I wrote two simple pipelines that pass messages from one topic to another (for 
testing purposes):
pipeline1 reads messages from topic_a and writes them to topic_b
pipeline2 reads messages from topic_b and writes them to topic_c

I tried to configure exactly-once behavior in both pipelines by:

  *   execution.checkpointing.mode: EXACTLY_ONCE
  *   KafkaIO.read().withReadCommitted()
  *   KafkaIO.writeRecords().withEOS()

I started running both pipelines and then killed the task manager of pipeline1. 
After it recovered, it started reading messages that were already processed 
from topic_a. I assume the offset was determined by the last checkpoint. Then, 
pipeline2 started receiving messages that were already processed from topic_b. 
So in practice I got at-least-once behavior instead of exactly-once.

I noticed that on pipeline2, there were two consumers (even though there is 
only one KafkaIO.read()): one named after my pipeline, and the other was 
created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded 
isolation.level=read_uncommitted.

I wrote similar pipelines in Flink and managed to achieve exactly-once behavior 
using:

  *   execution.checkpointing.mode: EXACTLY_ONCE
  *   isolation.level: read_committed
  *   KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  *   KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix")

I’m trying to figure out several things.

  1.  Why are there two consumers in the pipeline? Why is one of them 
hard-coded with read_uncommitted? Is it the reason that pipeline2 gets 
duplicate events after pipeline1 is recovered?
  2.  Is KafkaIO exactly-once implementation equivalent to flink KafkaSink 
implementation?
  3.  Is there anything else that I missed? How to make it work in Beam?

Thanks,
Ifat

Reply via email to