Hi Sandeep,

As I am also exploring the Beam KafkaIO, just to share some of my thoughts.
1. My understanding is, in order to guarantee no message loss, you will
need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
at-least-once with current KafkaIO.
2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
offset from source, all the messages in between the last checkpoint and
current checkpoint.
3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
start publishing messages that have been checkpointed.
4. In case of failure during publishing these messages, the messages will
be retried, there will be sequenceId assigned to each message, to determine
which messages are published successfully, which one need to be tried. e.g.
say messages 5 - 10 are in checkpoint, only 5 and 6 are published
successfully, then when restart from checkpoint, only 7 to 10 will be
published again.

My question for your setup:
if you just enable checkpoint and still use KafkaWriter [2], and your
application is stateless, then the only state is source offset.
consider below scenario:
checkpoint offset 10, and checkpoint is succeeded, then message with offset
10 fails to be published, job restarted, it will resume from checkpoint,
and start from offset 11, then message 10 gets lost.

Please correct me if I am missing anything.

Thanks a lot!
Eleanore

[1]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
[2]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin <eleanore....@gmail.com>
wrote:

>
> Hi Sandeep,
>
> Thanks a lot for the information! I am on a similar track which requires
> to scale up/down the stateless pipeline from a savepoint.
>
> It’s good to learn from your experience.
>
> Thanks!
> Eleanore
>
> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi Eleanore,
>>
>> We are using atleast once semantics when writing to Kafka. We are Ok with
>> duplicate messages.
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *Eleanore Jin <eleanore....@gmail.com>
>> *Date: *Monday, August 10, 2020 at 11:32 AM
>> *To: *"Kathula, Sandeep" <sandeep_kath...@intuit.com>
>> *Cc: *"user@beam.apache.org" <user@beam.apache.org>, "Vora, Jainik" <
>> jainik_v...@intuit.com>, "Benenson, Mikhail" <mikhail_benen...@intuit.com>,
>> "Deshpande, Omkar" <omkar_deshpa...@intuit.com>, "LeVeck, Matt" <
>> matt_lev...@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Thanks a lot for sharing! On a separate note, I see you are using the
>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>> understanding, just enabling a checkpoint will not be enough to guarantee
>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>> also read and write to Kafka with KafkaIO.
>>
>>
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>
>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>> Hi Eleanore,
>>
>>                 We are also observing that few task managers are able to
>> keep up with incoming load but few task managers are lagging behind after
>> starting from savepoint with less parallelism. Not all task managers are
>> affected by this problem. We repeated this test multiple times to confirm.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *"Kathula, Sandeep" <sandeep_kath...@intuit.com>
>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Date: *Monday, August 10, 2020 at 11:04 AM
>> *To: *"user@beam.apache.org" <user@beam.apache.org>, "
>> eleanore....@gmail.com" <eleanore....@gmail.com>
>> *Cc: *"Vora, Jainik" <jainik_v...@intuit.com>, "Benenson, Mikhail" <
>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" <matt_lev...@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Eleanore,
>>
>>                     Our DAG:
>>
>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
>> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
>> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
>> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
>> Events/Map/ParMultiDo(Anonymous) ->
>> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
>> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
>> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
>> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
>> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
>> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
>> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>>
>>
>>
>>
>>
>>             We read from and write to *kafka*.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>>
>>
>> *From: *Eleanore Jin <eleanore....@gmail.com>
>> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Date: *Monday, August 10, 2020 at 10:31 AM
>> *To: *"user@beam.apache.org" <user@beam.apache.org>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Can you please share your DAG? Is your job read and write to some sink?
>>
>>
>>
>> Thanks a lot!
>>
>>
>>
>> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>> Hi,
>>
>>    We started a Beam application with Flink runner with parallelism as
>> 50. It is a *stateless application.*  With initial parallelism of 50,
>> our application is able to process up to *50,000 records* per second.
>> After a week, we took a savepoint and restarted from savepoint with the
>> parallelism of *18.* We are seeing that our application is only able to
>> process *7000* records per second but we expect it to process almost
>> 18,000 records per second. Records processed per task manager was almost
>> *half* of what is used to process previously with 50 task managers.
>>
>>
>>
>>  When we started a new application with 18 pods without any savepoint, it
>> is able to process ~18500 records per second. This problem *occurs only
>> when we downscale after taking a savepoint*. We ported same application
>> to simple *Flink application without Apache Beam*, and there *it scales
>> well without any issues* after restarting from savepoint with less
>> parallelism.  So the problem should be with Apache Beam or some config we
>> are passing to Beam/Flink. We are using the following config:
>>
>>
>>
>> numberOfExecutionRetries=2
>>
>> externalizedCheckpointsEnabled=true
>>
>> retainExternalizedCheckpointsOnCancellation=true
>>
>>
>>
>>
>>
>> We didn’t give any maxParallelism in our Beam application but just
>> specifying parallelism.
>>
>>
>>
>> Beam version - 2.19
>>
>> Flink version- 1.9
>>
>>
>>
>> Any suggestions/help would be appreciated.
>>
>>
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>>
>>
>>

Reply via email to