Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-16 Thread David Morávek
Hello Sandeep,

Are you seeing any skew in your data (affected TMs are receiving more data
than others)? How many partitions does your source topic have (this could
explain why some TMs would have more work to perform)?

Also, would it be possible to retry your test with the latest SDK?

D.

On Sun, Aug 16, 2020 at 6:44 AM Eleanore Jin  wrote:

> Hi Sandeep,
>
> As I am also exploring the Beam KafkaIO, just to share some of my
> thoughts.
> 1. My understanding is, in order to guarantee no message loss, you will
> need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
> at-least-once with current KafkaIO.
> 2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
> offset from source, all the messages in between the last checkpoint and
> current checkpoint.
> 3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
> start publishing messages that have been checkpointed.
> 4. In case of failure during publishing these messages, the messages will
> be retried, there will be sequenceId assigned to each message, to determine
> which messages are published successfully, which one need to be tried. e.g.
> say messages 5 - 10 are in checkpoint, only 5 and 6 are published
> successfully, then when restart from checkpoint, only 7 to 10 will be
> published again.
>
> My question for your setup:
> if you just enable checkpoint and still use KafkaWriter [2], and your
> application is stateless, then the only state is source offset.
> consider below scenario:
> checkpoint offset 10, and checkpoint is succeeded, then message with
> offset 10 fails to be published, job restarted, it will resume from
> checkpoint, and start from offset 11, then message 10 gets lost.
>
> Please correct me if I am missing anything.
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
> [2]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
>
> On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin 
> wrote:
>
>>
>> Hi Sandeep,
>>
>> Thanks a lot for the information! I am on a similar track which requires
>> to scale up/down the stateless pipeline from a savepoint.
>>
>> It’s good to learn from your experience.
>>
>> Thanks!
>> Eleanore
>>
>> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>>> Hi Eleanore,
>>>
>>> We are using atleast once semantics when writing to Kafka. We are Ok
>>> with duplicate messages.
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *Eleanore Jin 
>>> *Date: *Monday, August 10, 2020 at 11:32 AM
>>> *To: *"Kathula, Sandeep" 
>>> *Cc: *"user@beam.apache.org" , "Vora, Jainik" <
>>> jainik_v...@intuit.com>, "Benenson, Mikhail" <
>>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Sandeep,
>>>
>>>
>>>
>>> Thanks a lot for sharing! On a separate note, I see you are using the
>>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>>> understanding, just enabling a checkpoint will not be enough to guarantee
>>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>>> also read and write to Kafka with KafkaIO.
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>> Eleanore
>>>
>>>
>>>
>>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>>> sandeep_kath...@intuit.com> wrote:
>>>
>>> Hi Eleanore,
>>>
>>>         We are also observing that few task managers are able to
>>> keep up with incoming load but few task managers are lagging behind after
>>> starting from savepoint with less parallelism. Not all task managers are
>>> affected by this problem. We repeated this test multiple times to confirm.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *"Kathula, Sandeep" 
>>> *Reply-To: *"

Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-15 Thread Eleanore Jin
Hi Sandeep,

As I am also exploring the Beam KafkaIO, just to share some of my thoughts.
1. My understanding is, in order to guarantee no message loss, you will
need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
at-least-once with current KafkaIO.
2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
offset from source, all the messages in between the last checkpoint and
current checkpoint.
3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
start publishing messages that have been checkpointed.
4. In case of failure during publishing these messages, the messages will
be retried, there will be sequenceId assigned to each message, to determine
which messages are published successfully, which one need to be tried. e.g.
say messages 5 - 10 are in checkpoint, only 5 and 6 are published
successfully, then when restart from checkpoint, only 7 to 10 will be
published again.

My question for your setup:
if you just enable checkpoint and still use KafkaWriter [2], and your
application is stateless, then the only state is source offset.
consider below scenario:
checkpoint offset 10, and checkpoint is succeeded, then message with offset
10 fails to be published, job restarted, it will resume from checkpoint,
and start from offset 11, then message 10 gets lost.

Please correct me if I am missing anything.

Thanks a lot!
Eleanore

[1]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
[2]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin 
wrote:

>
> Hi Sandeep,
>
> Thanks a lot for the information! I am on a similar track which requires
> to scale up/down the stateless pipeline from a savepoint.
>
> It’s good to learn from your experience.
>
> Thanks!
> Eleanore
>
> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi Eleanore,
>>
>> We are using atleast once semantics when writing to Kafka. We are Ok with
>> duplicate messages.
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *Eleanore Jin 
>> *Date: *Monday, August 10, 2020 at 11:32 AM
>> *To: *"Kathula, Sandeep" 
>> *Cc: *"user@beam.apache.org" , "Vora, Jainik" <
>> jainik_v...@intuit.com>, "Benenson, Mikhail" ,
>> "Deshpande, Omkar" , "LeVeck, Matt" <
>> matt_lev...@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Thanks a lot for sharing! On a separate note, I see you are using the
>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>> understanding, just enabling a checkpoint will not be enough to guarantee
>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>> also read and write to Kafka with KafkaIO.
>>
>>
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>
>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>> Hi Eleanore,
>>
>> We are also observing that few task managers are able to
>> keep up with incoming load but few task managers are lagging behind after
>> starting from savepoint with less parallelism. Not all task managers are
>> affected by this problem. We repeated this test multiple times to confirm.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *"Kathula, Sandeep" 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Monday, August 10, 2020 at 11:04 AM
>> *To: *"user@beam.apache.org" , "
>> eleanore@gmail.com" 
>> *Cc: *"Vora, Jainik" , "Benenson, Mikhail" <
>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Eleanore,
>>
>> Our DAG:
>>
>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
&g

Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-11 Thread Kathula, Sandeep
Hi Eleanore,
We are using atleast once semantics when writing to Kafka. We are Ok with 
duplicate messages.
Thanks
Sandeep Kathula

From: Eleanore Jin 
Date: Monday, August 10, 2020 at 11:32 AM
To: "Kathula, Sandeep" 
Cc: "user@beam.apache.org" , "Vora, Jainik" 
, "Benenson, Mikhail" , 
"Deshpande, Omkar" , "LeVeck, Matt" 

Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Sandeep,

Thanks a lot for sharing! On a separate note, I see you are using the 
KafkaIO.write, but not with EOS (exactly once semantics). From my 
understanding, just enabling a checkpoint will not be enough to guarantee no 
message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am also read 
and write to Kafka with KafkaIO.

[cid:image001.png@01D66FC9.1494E420]
Thanks a lot!
Eleanore

On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep 
mailto:sandeep_kath...@intuit.com>> wrote:
Hi Eleanore,
We are also observing that few task managers are able to keep 
up with incoming load but few task managers are lagging behind after starting 
from savepoint with less parallelism. Not all task managers are affected by 
this problem. We repeated this test multiple times to confirm.

Thanks
Sandeep Kathula

From: "Kathula, Sandeep" 
mailto:sandeep_kath...@intuit.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Monday, August 10, 2020 at 11:04 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>, 
"eleanore@gmail.com<mailto:eleanore@gmail.com>" 
mailto:eleanore@gmail.com>>
Cc: "Vora, Jainik" mailto:jainik_v...@intuit.com>>, 
"Benenson, Mikhail" 
mailto:mikhail_benen...@intuit.com>>, "Deshpande, 
Omkar" mailto:omkar_deshpa...@intuit.com>>, 
"LeVeck, Matt" mailto:matt_lev...@intuit.com>>
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Eleanore,
Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From 
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip 
Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip 
Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip 
Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable 
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract 
Events/Map/ParMultiDo(Anonymous) -> 
UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> 
IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless 
Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin mailto:eleanore@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
mailto:sandeep_kath...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateless application.  With initial parallelism of 50, our application is 
able to process up to 50,000 records per second. After a week, we took a 
savepoint and restarted from savepoint with the parallelism of 18. We are 
seeing that our application is only able to process 7000 records per second but 
we expect it to process almost 18,000 records per second. Records processed per 
task manager was almost half of what is used to process previously with 50 task 
managers.

 When we started a new application with 18 pods without any savepoint, it is 
able to process ~18500 records per second. This problem occurs only when we 
downscale after taking a savepoint. We ported same application to simple Flink 
application without Apache Beam, and there it scales well without any issues 
after restarting from savepoint with less parallelism.  So the problem should 
be with Apache Beam or some config we are passing to Beam/Flink. We are using 
the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying 
parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula




Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Kathula, Sandeep
Hi Eleanore,
Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From 
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip 
Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip 
Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip 
Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable 
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract 
Events/Map/ParMultiDo(Anonymous) -> 
UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> 
IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless 
Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin 
Reply-To: "user@beam.apache.org" 
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org" 
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
mailto:sandeep_kath...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateless application.  With initial parallelism of 50, our application is 
able to process up to 50,000 records per second. After a week, we took a 
savepoint and restarted from savepoint with the parallelism of 18. We are 
seeing that our application is only able to process 7000 records per second but 
we expect it to process almost 18,000 records per second. Records processed per 
task manager was almost half of what is used to process previously with 50 task 
managers.

 When we started a new application with 18 pods without any savepoint, it is 
able to process ~18500 records per second. This problem occurs only when we 
downscale after taking a savepoint. We ported same application to simple Flink 
application without Apache Beam, and there it scales well without any issues 
after restarting from savepoint with less parallelism.  So the problem should 
be with Apache Beam or some config we are passing to Beam/Flink. We are using 
the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying 
parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula




Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Eleanore Jin
Hi Sandeep,

Thanks a lot for sharing! On a separate note, I see you are using the
KafkaIO.write, but not with EOS (exactly once semantics). From my
understanding, just enabling a checkpoint will not be enough to guarantee
no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
also read and write to Kafka with KafkaIO.

[image: image.png]
Thanks a lot!
Eleanore

On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
sandeep_kath...@intuit.com> wrote:

> Hi Eleanore,
>
> We are also observing that few task managers are able to
> keep up with incoming load but few task managers are lagging behind after
> starting from savepoint with less parallelism. Not all task managers are
> affected by this problem. We repeated this test multiple times to confirm.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
> *From: *"Kathula, Sandeep" 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, August 10, 2020 at 11:04 AM
> *To: *"user@beam.apache.org" , "
> eleanore@gmail.com" 
> *Cc: *"Vora, Jainik" , "Benenson, Mikhail" <
> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Eleanore,
>
> Our DAG:
>
> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
> Events/Map/ParMultiDo(Anonymous) ->
> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>
>
>
>
>
>     We read from and write to *kafka*.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
> *From: *Eleanore Jin 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Monday, August 10, 2020 at 10:31 AM
> *To: *"user@beam.apache.org" 
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Sandeep,
>
>
>
> Can you please share your DAG? Is your job read and write to some sink?
>
>
>
> Thanks a lot!
>
>
>
> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
> Hi,
>
>We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* 
> records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
>


Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Kathula, Sandeep
Hi Eleanore,
We are also observing that few task managers are able to keep 
up with incoming load but few task managers are lagging behind after starting 
from savepoint with less parallelism. Not all task managers are affected by 
this problem. We repeated this test multiple times to confirm.

Thanks
Sandeep Kathula

From: "Kathula, Sandeep" 
Reply-To: "user@beam.apache.org" 
Date: Monday, August 10, 2020 at 11:04 AM
To: "user@beam.apache.org" , "eleanore@gmail.com" 

Cc: "Vora, Jainik" , "Benenson, Mikhail" 
, "Deshpande, Omkar" , 
"LeVeck, Matt" 
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Eleanore,
Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From 
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip 
Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip 
Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip 
Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable 
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract 
Events/Map/ParMultiDo(Anonymous) -> 
UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> 
IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless 
Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin 
Reply-To: "user@beam.apache.org" 
Date: Monday, August 10, 2020 at 10:31 AM
To: "user@beam.apache.org" 
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
mailto:sandeep_kath...@intuit.com>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateless application.  With initial parallelism of 50, our application is 
able to process up to 50,000 records per second. After a week, we took a 
savepoint and restarted from savepoint with the parallelism of 18. We are 
seeing that our application is only able to process 7000 records per second but 
we expect it to process almost 18,000 records per second. Records processed per 
task manager was almost half of what is used to process previously with 50 task 
managers.

 When we started a new application with 18 pods without any savepoint, it is 
able to process ~18500 records per second. This problem occurs only when we 
downscale after taking a savepoint. We ported same application to simple Flink 
application without Apache Beam, and there it scales well without any issues 
after restarting from savepoint with less parallelism.  So the problem should 
be with Apache Beam or some config we are passing to Beam/Flink. We are using 
the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying 
parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula




Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Eleanore Jin
Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
wrote:

> Hi,
>
>We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* 
> records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>


Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Kathula, Sandeep
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateless application.  With initial parallelism of 50, our application is 
able to process up to 50,000 records per second. After a week, we took a 
savepoint and restarted from savepoint with the parallelism of 18. We are 
seeing that our application is only able to process 7000 records per second but 
we expect it to process almost 18,000 records per second. Records processed per 
task manager was almost half of what is used to process previously with 50 task 
managers.

 When we started a new application with 18 pods without any savepoint, it is 
able to process ~18500 records per second. This problem occurs only when we 
downscale after taking a savepoint. We ported same application to simple Flink 
application without Apache Beam, and there it scales well without any issues 
after restarting from savepoint with less parallelism.  So the problem should 
be with Apache Beam or some config we are passing to Beam/Flink. We are using 
the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying 
parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula