Hi Ifat,

I don't work with Beam and Flink anymore so the below is from memory.

When I went through what you are going through, I spent quite some time to
get EOS working. While it kinda worked, I ran into several issues which
I'll describe below.

The issue with EOS is that it uses Kafka transactions. If you're using the
Flink state, that transaction id is saved there. When your pipeline
restarts, there's all kinds of issues that pop up because of that.
Furthermore, I never managed to get more than one record per transaction,
your offsets will not be sequential. Furthermore, performance was very poor.

The somewhat working solution is to forget EOS and rely on the Flink state
alone. What that means is that Beam KafkaIO will use the Flink state to
know where to start reading when the pipeline restarts (again, my memory is
fuzzy). Inflight records, those who have already been read when the
pipeline is stopped will depend on how your transform manages its state.

There's another pitfall though. Beam and Flink have outdated integration
when it comes to Flink's state. You can have a look at the mailing list, I
sent a few emails about the topic [1]. Things like schema evolution is
tricky, sometimes even filesystem paths are saved in the state which
reduces portability.

Anyways, I found that Beam running on Flink is a path filled with pitfalls.
Use caution.

Hopefully my 2 cents are of help.

Cheers,
Cristian

[1] https://lists.apache.org/list?user@beam.apache.org:lte=4y:Flink%20state

On Wed, May 10, 2023 at 11:03 PM Ifat Afek (Nokia) <ifat.a...@nokia.com>
wrote:

> Hi,
>
>
>
> I’m trying to configure exactly-once recovery for pipelines that are
> running on top of flink with kafka sources and sinks.
>
>
>
> I wrote two simple pipelines that pass messages from one topic to another
> (for testing purposes):
>
> pipeline1 reads messages from topic_a and writes them to topic_b
>
> pipeline2 reads messages from topic_b and writes them to topic_c
>
>
>
> I tried to configure exactly-once behavior in both pipelines by:
>
>    - execution.checkpointing.mode: EXACTLY_ONCE
>    - KafkaIO.read().withReadCommitted()
>    - KafkaIO.writeRecords().withEOS()
>
>
>
> I started running both pipelines and then killed the task manager of
> pipeline1. After it recovered, it started reading messages that were
> already processed from topic_a. I assume the offset was determined by the
> last checkpoint. Then, pipeline2 started receiving messages that were
> already processed from topic_b. So in practice I got at-least-once behavior
> instead of exactly-once.
>
>
>
> I noticed that on pipeline2, there were two consumers (even though there
> is only one KafkaIO.read()): one named after my pipeline, and the other was
> created in KafkaIOUtils.getOffsetConsumerConfig() with hard-coded
> isolation.level=read_uncommitted.
>
>
>
> I wrote similar pipelines in Flink and managed to achieve exactly-once
> behavior using:
>
>    - execution.checkpointing.mode: EXACTLY_ONCE
>    - isolation.level: read_committed
>    -
>    KafkaSink.builder().setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
>    - KafkaSink.builder().setTransactionalIdPrefix("my-trx-id-prefix")
>
>
>
> I’m trying to figure out several things.
>
>    1. Why are there two consumers in the pipeline? Why is one of them
>    hard-coded with read_uncommitted? Is it the reason that pipeline2 gets
>    duplicate events after pipeline1 is recovered?
>    2. Is KafkaIO exactly-once implementation equivalent to flink
>    KafkaSink implementation?
>    3. Is there anything else that I missed? How to make it work in Beam?
>
>
>
> Thanks,
>
> Ifat
>
>
>

Reply via email to