Re: Beam flink runner job not keeping up with input rate after downscaling
Hello Sandeep, Are you seeing any skew in your data (affected TMs are receiving more data than others)? How many partitions does your source topic have (this could explain why some TMs would have more work to perform)? Also, would it be possible to retry your test with the latest SDK? D. On Sun, Aug 16, 2020 at 6:44 AM Eleanore Jin wrote: > Hi Sandeep, > > As I am also exploring the Beam KafkaIO, just to share some of my > thoughts. > 1. My understanding is, in order to guarantee no message loss, you will > need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to > at-least-once with current KafkaIO. > 2. when KafkaExactlyOnceSink is enabled, the checkpoint will include: > offset from source, all the messages in between the last checkpoint and > current checkpoint. > 3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will > start publishing messages that have been checkpointed. > 4. In case of failure during publishing these messages, the messages will > be retried, there will be sequenceId assigned to each message, to determine > which messages are published successfully, which one need to be tried. e.g. > say messages 5 - 10 are in checkpoint, only 5 and 6 are published > successfully, then when restart from checkpoint, only 7 to 10 will be > published again. > > My question for your setup: > if you just enable checkpoint and still use KafkaWriter [2], and your > application is stateless, then the only state is source offset. > consider below scenario: > checkpoint offset 10, and checkpoint is succeeded, then message with > offset 10 fails to be published, job restarted, it will resume from > checkpoint, and start from offset 11, then message 10 gets lost. > > Please correct me if I am missing anything. > > Thanks a lot! > Eleanore > > [1] > https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java > [2] > https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java > > On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin > wrote: > >> >> Hi Sandeep, >> >> Thanks a lot for the information! I am on a similar track which requires >> to scale up/down the stateless pipeline from a savepoint. >> >> It’s good to learn from your experience. >> >> Thanks! >> Eleanore >> >> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep < >> sandeep_kath...@intuit.com> wrote: >> >>> Hi Eleanore, >>> >>> We are using atleast once semantics when writing to Kafka. We are Ok >>> with duplicate messages. >>> >>> Thanks >>> >>> Sandeep Kathula >>> >>> >>> >>> *From: *Eleanore Jin >>> *Date: *Monday, August 10, 2020 at 11:32 AM >>> *To: *"Kathula, Sandeep" >>> *Cc: *"user@beam.apache.org" , "Vora, Jainik" < >>> jainik_v...@intuit.com>, "Benenson, Mikhail" < >>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" < >>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" >>> *Subject: *Re: Beam flink runner job not keeping up with input rate >>> after downscaling >>> >>> >>> >>> This email is from an external sender. >>> >>> >>> >>> Hi Sandeep, >>> >>> >>> >>> Thanks a lot for sharing! On a separate note, I see you are using the >>> KafkaIO.write, but not with EOS (exactly once semantics). From my >>> understanding, just enabling a checkpoint will not be enough to guarantee >>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am >>> also read and write to Kafka with KafkaIO. >>> >>> >>> >>> Thanks a lot! >>> >>> Eleanore >>> >>> >>> >>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep < >>> sandeep_kath...@intuit.com> wrote: >>> >>> Hi Eleanore, >>> >>> We are also observing that few task managers are able to >>> keep up with incoming load but few task managers are lagging behind after >>> starting from savepoint with less parallelism. Not all task managers are >>> affected by this problem. We repeated this test multiple times to confirm. >>> >>> >>> >>> Thanks >>> >>> Sandeep Kathula >>> >>> >>> >>> *From: *"Kathula, Sandeep" >>> *Reply-To: *"
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Sandeep, As I am also exploring the Beam KafkaIO, just to share some of my thoughts. 1. My understanding is, in order to guarantee no message loss, you will need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to at-least-once with current KafkaIO. 2. when KafkaExactlyOnceSink is enabled, the checkpoint will include: offset from source, all the messages in between the last checkpoint and current checkpoint. 3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will start publishing messages that have been checkpointed. 4. In case of failure during publishing these messages, the messages will be retried, there will be sequenceId assigned to each message, to determine which messages are published successfully, which one need to be tried. e.g. say messages 5 - 10 are in checkpoint, only 5 and 6 are published successfully, then when restart from checkpoint, only 7 to 10 will be published again. My question for your setup: if you just enable checkpoint and still use KafkaWriter [2], and your application is stateless, then the only state is source offset. consider below scenario: checkpoint offset 10, and checkpoint is succeeded, then message with offset 10 fails to be published, job restarted, it will resume from checkpoint, and start from offset 11, then message 10 gets lost. Please correct me if I am missing anything. Thanks a lot! Eleanore [1] https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java [2] https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin wrote: > > Hi Sandeep, > > Thanks a lot for the information! I am on a similar track which requires > to scale up/down the stateless pipeline from a savepoint. > > It’s good to learn from your experience. > > Thanks! > Eleanore > > On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep < > sandeep_kath...@intuit.com> wrote: > >> Hi Eleanore, >> >> We are using atleast once semantics when writing to Kafka. We are Ok with >> duplicate messages. >> >> Thanks >> >> Sandeep Kathula >> >> >> >> *From: *Eleanore Jin >> *Date: *Monday, August 10, 2020 at 11:32 AM >> *To: *"Kathula, Sandeep" >> *Cc: *"user@beam.apache.org" , "Vora, Jainik" < >> jainik_v...@intuit.com>, "Benenson, Mikhail" , >> "Deshpande, Omkar" , "LeVeck, Matt" < >> matt_lev...@intuit.com> >> *Subject: *Re: Beam flink runner job not keeping up with input rate >> after downscaling >> >> >> >> This email is from an external sender. >> >> >> >> Hi Sandeep, >> >> >> >> Thanks a lot for sharing! On a separate note, I see you are using the >> KafkaIO.write, but not with EOS (exactly once semantics). From my >> understanding, just enabling a checkpoint will not be enough to guarantee >> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am >> also read and write to Kafka with KafkaIO. >> >> >> >> Thanks a lot! >> >> Eleanore >> >> >> >> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep < >> sandeep_kath...@intuit.com> wrote: >> >> Hi Eleanore, >> >> We are also observing that few task managers are able to >> keep up with incoming load but few task managers are lagging behind after >> starting from savepoint with less parallelism. Not all task managers are >> affected by this problem. We repeated this test multiple times to confirm. >> >> >> >> Thanks >> >> Sandeep Kathula >> >> >> >> *From: *"Kathula, Sandeep" >> *Reply-To: *"user@beam.apache.org" >> *Date: *Monday, August 10, 2020 at 11:04 AM >> *To: *"user@beam.apache.org" , " >> eleanore@gmail.com" >> *Cc: *"Vora, Jainik" , "Benenson, Mikhail" < >> mikhail_benen...@intuit.com>, "Deshpande, Omkar" < >> omkar_deshpa...@intuit.com>, "LeVeck, Matt" >> *Subject: *Re: Beam flink runner job not keeping up with input rate >> after downscaling >> >> >> >> This email is from an external sender. >> >> >> >> Hi Eleanore, >> >> Our DAG: >> >> Source: Strip Metadata/EventBusIO.Read/Read Bytes From >> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip >> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip &g
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Eleanore, We are using atleast once semantics when writing to Kafka. We are Ok with duplicate messages. Thanks Sandeep Kathula From: Eleanore Jin Date: Monday, August 10, 2020 at 11:32 AM To: "Kathula, Sandeep" Cc: "user@beam.apache.org" , "Vora, Jainik" , "Benenson, Mikhail" , "Deshpande, Omkar" , "LeVeck, Matt" Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Sandeep, Thanks a lot for sharing! On a separate note, I see you are using the KafkaIO.write, but not with EOS (exactly once semantics). From my understanding, just enabling a checkpoint will not be enough to guarantee no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am also read and write to Kafka with KafkaIO. [cid:image001.png@01D66FC9.1494E420] Thanks a lot! Eleanore On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep mailto:sandeep_kath...@intuit.com>> wrote: Hi Eleanore, We are also observing that few task managers are able to keep up with incoming load but few task managers are lagging behind after starting from savepoint with less parallelism. Not all task managers are affected by this problem. We repeated this test multiple times to confirm. Thanks Sandeep Kathula From: "Kathula, Sandeep" mailto:sandeep_kath...@intuit.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Date: Monday, August 10, 2020 at 11:04 AM To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>>, "eleanore@gmail.com<mailto:eleanore@gmail.com>" mailto:eleanore@gmail.com>> Cc: "Vora, Jainik" mailto:jainik_v...@intuit.com>>, "Benenson, Mikhail" mailto:mikhail_benen...@intuit.com>>, "Deshpande, Omkar" mailto:omkar_deshpa...@intuit.com>>, "LeVeck, Matt" mailto:matt_lev...@intuit.com>> Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Eleanore, Our DAG: Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) We read from and write to kafka. Thanks Sandeep Kathula From: Eleanore Jin mailto:eleanore@gmail.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Date: Monday, August 10, 2020 at 10:31 AM To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Sandeep, Can you please share your DAG? Is your job read and write to some sink? Thanks a lot! On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep mailto:sandeep_kath...@intuit.com>> wrote: Hi, We started a Beam application with Flink runner with parallelism as 50. It is a stateless application. With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers. When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism. So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config: numberOfExecutionRetries=2 externalizedCheckpointsEnabled=true retainExternalizedCheckpointsOnCancellation=true We didn’t give any maxParallelism in our Beam application but just specifying parallelism. Beam version - 2.19 Flink version- 1.9 Any suggestions/help would be appreciated. Thanks Sandeep Kathula
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Eleanore, Our DAG: Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) We read from and write to kafka. Thanks Sandeep Kathula From: Eleanore Jin Reply-To: "user@beam.apache.org" Date: Monday, August 10, 2020 at 10:31 AM To: "user@beam.apache.org" Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Sandeep, Can you please share your DAG? Is your job read and write to some sink? Thanks a lot! On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep mailto:sandeep_kath...@intuit.com>> wrote: Hi, We started a Beam application with Flink runner with parallelism as 50. It is a stateless application. With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers. When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism. So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config: numberOfExecutionRetries=2 externalizedCheckpointsEnabled=true retainExternalizedCheckpointsOnCancellation=true We didn’t give any maxParallelism in our Beam application but just specifying parallelism. Beam version - 2.19 Flink version- 1.9 Any suggestions/help would be appreciated. Thanks Sandeep Kathula
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Sandeep, Thanks a lot for sharing! On a separate note, I see you are using the KafkaIO.write, but not with EOS (exactly once semantics). From my understanding, just enabling a checkpoint will not be enough to guarantee no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am also read and write to Kafka with KafkaIO. [image: image.png] Thanks a lot! Eleanore On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep < sandeep_kath...@intuit.com> wrote: > Hi Eleanore, > > We are also observing that few task managers are able to > keep up with incoming load but few task managers are lagging behind after > starting from savepoint with less parallelism. Not all task managers are > affected by this problem. We repeated this test multiple times to confirm. > > > > Thanks > > Sandeep Kathula > > > > *From: *"Kathula, Sandeep" > *Reply-To: *"user@beam.apache.org" > *Date: *Monday, August 10, 2020 at 11:04 AM > *To: *"user@beam.apache.org" , " > eleanore@gmail.com" > *Cc: *"Vora, Jainik" , "Benenson, Mikhail" < > mikhail_benen...@intuit.com>, "Deshpande, Omkar" < > omkar_deshpa...@intuit.com>, "LeVeck, Matt" > *Subject: *Re: Beam flink runner job not keeping up with input rate after > downscaling > > > > This email is from an external sender. > > > > Hi Eleanore, > > Our DAG: > > Source: Strip Metadata/EventBusIO.Read/Read Bytes From > Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip > Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip > Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip > Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable > Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract > Events/Map/ParMultiDo(Anonymous) -> > UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> > IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless > Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless > Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> > Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka > ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless > Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) > > > > > > We read from and write to *kafka*. > > > > Thanks > > Sandeep Kathula > > > > > > *From: *Eleanore Jin > *Reply-To: *"user@beam.apache.org" > *Date: *Monday, August 10, 2020 at 10:31 AM > *To: *"user@beam.apache.org" > *Subject: *Re: Beam flink runner job not keeping up with input rate after > downscaling > > > > This email is from an external sender. > > > > Hi Sandeep, > > > > Can you please share your DAG? Is your job read and write to some sink? > > > > Thanks a lot! > > > > On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep < > sandeep_kath...@intuit.com> wrote: > > Hi, > >We started a Beam application with Flink runner with parallelism as 50. > It is a *stateless application.* With initial parallelism of 50, our > application is able to process up to *50,000 records* per second. After a > week, we took a savepoint and restarted from savepoint with the parallelism > of *18.* We are seeing that our application is only able to process *7000* > records > per second but we expect it to process almost 18,000 records per second. > Records processed per task manager was almost *half* of what is used to > process previously with 50 task managers. > > > > When we started a new application with 18 pods without any savepoint, it > is able to process ~18500 records per second. This problem *occurs only > when we downscale after taking a savepoint*. We ported same application > to simple *Flink application without Apache Beam*, and there *it scales > well without any issues* after restarting from savepoint with less > parallelism. So the problem should be with Apache Beam or some config we > are passing to Beam/Flink. We are using the following config: > > > > numberOfExecutionRetries=2 > > externalizedCheckpointsEnabled=true > > retainExternalizedCheckpointsOnCancellation=true > > > > > > We didn’t give any maxParallelism in our Beam application but just > specifying parallelism. > > > > Beam version - 2.19 > > Flink version- 1.9 > > > > Any suggestions/help would be appreciated. > > > > > > Thanks > > Sandeep Kathula > > > > > >
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Eleanore, We are also observing that few task managers are able to keep up with incoming load but few task managers are lagging behind after starting from savepoint with less parallelism. Not all task managers are affected by this problem. We repeated this test multiple times to confirm. Thanks Sandeep Kathula From: "Kathula, Sandeep" Reply-To: "user@beam.apache.org" Date: Monday, August 10, 2020 at 11:04 AM To: "user@beam.apache.org" , "eleanore@gmail.com" Cc: "Vora, Jainik" , "Benenson, Mikhail" , "Deshpande, Omkar" , "LeVeck, Matt" Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Eleanore, Our DAG: Source: Strip Metadata/EventBusIO.Read/Read Bytes From Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract Events/Map/ParMultiDo(Anonymous) -> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) We read from and write to kafka. Thanks Sandeep Kathula From: Eleanore Jin Reply-To: "user@beam.apache.org" Date: Monday, August 10, 2020 at 10:31 AM To: "user@beam.apache.org" Subject: Re: Beam flink runner job not keeping up with input rate after downscaling This email is from an external sender. Hi Sandeep, Can you please share your DAG? Is your job read and write to some sink? Thanks a lot! On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep mailto:sandeep_kath...@intuit.com>> wrote: Hi, We started a Beam application with Flink runner with parallelism as 50. It is a stateless application. With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers. When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism. So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config: numberOfExecutionRetries=2 externalizedCheckpointsEnabled=true retainExternalizedCheckpointsOnCancellation=true We didn’t give any maxParallelism in our Beam application but just specifying parallelism. Beam version - 2.19 Flink version- 1.9 Any suggestions/help would be appreciated. Thanks Sandeep Kathula
Re: Beam flink runner job not keeping up with input rate after downscaling
Hi Sandeep, Can you please share your DAG? Is your job read and write to some sink? Thanks a lot! On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep wrote: > Hi, > >We started a Beam application with Flink runner with parallelism as 50. > It is a *stateless application.* With initial parallelism of 50, our > application is able to process up to *50,000 records* per second. After a > week, we took a savepoint and restarted from savepoint with the parallelism > of *18.* We are seeing that our application is only able to process *7000* > records > per second but we expect it to process almost 18,000 records per second. > Records processed per task manager was almost *half* of what is used to > process previously with 50 task managers. > > > > When we started a new application with 18 pods without any savepoint, it > is able to process ~18500 records per second. This problem *occurs only > when we downscale after taking a savepoint*. We ported same application > to simple *Flink application without Apache Beam*, and there *it scales > well without any issues* after restarting from savepoint with less > parallelism. So the problem should be with Apache Beam or some config we > are passing to Beam/Flink. We are using the following config: > > > > numberOfExecutionRetries=2 > > externalizedCheckpointsEnabled=true > > retainExternalizedCheckpointsOnCancellation=true > > > > > > We didn’t give any maxParallelism in our Beam application but just > specifying parallelism. > > > > Beam version - 2.19 > > Flink version- 1.9 > > > > Any suggestions/help would be appreciated. > > > > > > Thanks > > Sandeep Kathula > > > > >
Beam flink runner job not keeping up with input rate after downscaling
Hi, We started a Beam application with Flink runner with parallelism as 50. It is a stateless application. With initial parallelism of 50, our application is able to process up to 50,000 records per second. After a week, we took a savepoint and restarted from savepoint with the parallelism of 18. We are seeing that our application is only able to process 7000 records per second but we expect it to process almost 18,000 records per second. Records processed per task manager was almost half of what is used to process previously with 50 task managers. When we started a new application with 18 pods without any savepoint, it is able to process ~18500 records per second. This problem occurs only when we downscale after taking a savepoint. We ported same application to simple Flink application without Apache Beam, and there it scales well without any issues after restarting from savepoint with less parallelism. So the problem should be with Apache Beam or some config we are passing to Beam/Flink. We are using the following config: numberOfExecutionRetries=2 externalizedCheckpointsEnabled=true retainExternalizedCheckpointsOnCancellation=true We didn’t give any maxParallelism in our Beam application but just specifying parallelism. Beam version - 2.19 Flink version- 1.9 Any suggestions/help would be appreciated. Thanks Sandeep Kathula