Hi Steve, We have one wrapper optimization[1] merged in and it will be released with 2.27.0. Would you like to verify whether it helps improve the performance on DirectRunner?
[1] https://github.com/apache/beam/pull/13592 On Mon, Dec 28, 2020 at 12:17 PM Boyuan Zhang <boyu...@google.com> wrote: > Hi Antonio, > > Thanks for the data! I want to elaborate more on where the overhead could > come from when on Flink. > > - with --experiments=use_deprecated_read --fasterrCopy=true, I am able to >> achieve 13K TPS > > > This execution uses UnboundedSource path, where the checkpoint frequency > for source reading is configured as the same frequency of flink checkpoint > interval. In your case, the checkpoint frequency is every 60s. > Flink reschedule the checkpoint marks to process by reading from states. > Thue the overhead here could be the time for executing > source.getCheckpointMark + reading from/writing to state + overhead of > flink checkpoint execution. > > >> - with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, >> I am able to achieve 10K >> > > This execution uses Kafka SDF implementation, where the > checkpoint frequency is configutred as every 10000 elements or every 10 > seconds by the OutputAndTimeBoundedSplittableProcessElementInvoker. As you > mentioned that every poll takes 0.8s and returns 200 elements. So the > checkpoint frequency here should be every 4s(hitting the 10000 limit). > The residuals can be from runner-issued checkpoint or SDF self-checkpoint. > Flink reshedules the residuals by using Timer and State. > Thus the overhead here could be the time for scheduling timers + reading > from/writing to states. I would expect to see improvements if we control > the frequency longer than 60s(for example, every 60s or every 15000 > elements). > > >> - with --fasterCopy=true alone, I am only able to achieve 5K TPS > > > This execution uses UnboundedSourceAsSDFWrapperFn path, where the > checkpoint frequency is also every 10000 elements or every 10 > seconds. Flink also reshedules the residuals by using Timer and State. So > the overhead here could be the time for scheduling timers + reading > from/writing to states + overhead of the wrapper wrapping unbounded source. > > > On Wed, Dec 23, 2020 at 12:30 PM Jan Lukavský <je...@seznam.cz> wrote: > >> OK, >> >> could you make an experiment and increase the parallelism to something >> significantly higher than the total number of partitions? Say 5 times >> higher? Would that have impact on throughput in your case? >> >> Jan >> >> On 12/23/20 7:03 PM, Antonio Si wrote: >> > Hi Jan, >> > >> > The performance data that I reported was run with parallelism = 8. We >> also ran with parallelism = 15 and we observed similar behaviors although I >> don't have the exact numbers. I can get you the numbers if needed. >> > >> > Regarding number of partitions, since we have multiple topics, the >> number of partitions varies from 180 to 12. The highest TPS topic has 180 >> partitions, while the lowest TPS topic has 12 partitions. >> > >> > Thanks. >> > >> > Antonio. >> > >> > On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote: >> >> Hi Antonio, >> >> >> >> can you please clarify a few things: >> >> >> >> a) what parallelism you use for your sources >> >> >> >> b) how many partitions there is in your topic(s) >> >> >> >> Thanks, >> >> >> >> Jan >> >> >> >> On 12/22/20 10:07 PM, Antonio Si wrote: >> >>> Hi Boyuan, >> >>> >> >>> Let me clarify, I have tried with and without using >> --experiments=beam_fn_api,use_sdf_kafka_read option: >> >>> >> >>> - with --experiments=use_deprecated_read --fasterrCopy=true, I am >> able to achieve 13K TPS >> >>> - with --experiments="beam_fn_api,use_sdf_kafka_read" >> --fasterCopy=true, I am able to achieve 10K >> >>> - with --fasterCopy=true alone, I am only able to achieve 5K TPS >> >>> >> >>> In our testcase, we have multiple topics, checkpoint intervals is >> 60s. Some topics have a lot higher traffics than others. We look at the >> case with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true >> options a little. Based on our observation, each consumer poll() in >> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with >> high traffics, it will continue in the loop because every poll() will >> return some records. Every poll returns about 200 records. So, it takes >> about 0.8ms for every 200 records. I am not sure if that is part of the >> reason for the performance. >> >>> >> >>> Thanks. >> >>> >> >>> Antonio. >> >>> >> >>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote: >> >>>> Hi Antonio, >> >>>> >> >>>> Thanks for the data point. That's very valuable information! >> >>>> >> >>>> I didn't use DirectRunner. I am using FlinkRunner. >> >>>>> We measured the number of Kafka messages that we can processed per >> second. >> >>>>> With Beam v2.26 with --experiments=use_deprecated_read and >> >>>>> --fasterCopy=true, >> >>>>> we are able to consume 13K messages per second, but with Beam v2.26 >> >>>>> without the use_deprecated_read option, we are only able to process >> 10K >> >>>>> messages >> >>>>> per second for the same pipeline. >> >>>> We do have SDF implementation of Kafka Read instead of using the >> wrapper. >> >>>> Would you like to have a try to see whether it helps you improve your >> >>>> situation? You can use --experiments=beam_fn_api,use_sdf_kafka_read >> to >> >>>> switch to the Kafka SDF Read. >> >>>> >> >>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang <boyu...@google.com> >> wrote: >> >>>> >> >>>>> Hi Jan, >> >>>>>> it seems that what we would want is to couple the lifecycle of the >> Reader >> >>>>>> not with the restriction but with the particular instance of >> >>>>>> (Un)boundedSource (after being split). That could be done in the >> processing >> >>>>>> DoFn, if it contained a cache mapping instance of the source to the >> >>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we >> could assign >> >>>>>> (or create) the reader to the tracker, as the tracker is created >> for each >> >>>>>> restriction. >> >>>>>> >> >>>>>> WDYT? >> >>>>>> >> >>>>> I was thinking about this but it seems like it is not applicable to >> the >> >>>>> way how UnboundedSource and UnboundedReader work together. >> >>>>> Please correct me if I'm wrong. The UnboundedReader is created from >> >>>>> UnboundedSource per CheckpointMark[1], which means for certain >> sources, the >> >>>>> CheckpointMark could affect some attributes like start position of >> the >> >>>>> reader when resuming. So a single UnboundedSource could be mapped to >> >>>>> multiple readers because of different instances of CheckpointMarl. >> That's >> >>>>> also the reason why we use CheckpointMark as the restriction. >> >>>>> >> >>>>> Please let me know if I misunderstand your suggestion. >> >>>>> >> >>>>> [1] >> >>>>> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 >> >>>>> >> >>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com> >> wrote: >> >>>>> >> >>>>>> Hi Boyuan, >> >>>>>> >> >>>>>> Sorry for my late reply. I was off for a few days. >> >>>>>> >> >>>>>> I didn't use DirectRunner. I am using FlinkRunner. >> >>>>>> >> >>>>>> We measured the number of Kafka messages that we can processed per >> second. >> >>>>>> With Beam v2.26 with --experiments=use_deprecated_read and >> >>>>>> --fasterCopy=true, >> >>>>>> we are able to consume 13K messages per second, but with Beam v2.26 >> >>>>>> without the use_deprecated_read option, we are only able to >> process 10K >> >>>>>> messages >> >>>>>> per second for the same pipeline. >> >>>>>> >> >>>>>> Thanks and regards, >> >>>>>> >> >>>>>> Antonio. >> >>>>>> >> >>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: >> >>>>>>> Hi Antonio, >> >>>>>>> >> >>>>>>> Thanks for the details! Which version of Beam SDK are you using? >> And are >> >>>>>>> you using --experiments=beam_fn_api with DirectRunner to launch >> your >> >>>>>>> pipeline? >> >>>>>>> >> >>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka >> >>>>>>> topic+partition as input element and a KafkaConsumer will be >> assigned to >> >>>>>>> this topic+partition then poll records continuously. The Kafka >> consumer >> >>>>>>> will resume reading and return from the process fn when >> >>>>>>> >> >>>>>>> - There are no available records currently(this is a feature >> of SDF >> >>>>>>> which calls SDF self-initiated checkpoint) >> >>>>>>> - The OutputAndTimeBoundedSplittableProcessElementInvoker >> issues >> >>>>>>> checkpoint request to ReadFromKafkaDoFn for getting partial >> results. >> >>>>>> The >> >>>>>>> checkpoint frequency for DirectRunner is every 100 output >> records or >> >>>>>> every >> >>>>>>> 1 seconds. >> >>>>>>> >> >>>>>>> It seems like either the self-initiated checkpoint or DirectRunner >> >>>>>> issued >> >>>>>>> checkpoint gives you the performance regression since there is >> overhead >> >>>>>>> when rescheduling residuals. In your case, it's more like that the >> >>>>>>> checkpoint behavior of >> >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker >> >>>>>>> gives you 200 elements a batch. I want to understand what kind of >> >>>>>>> performance regression you are noticing? Is it slower to output >> the same >> >>>>>>> amount of records? >> >>>>>>> >> >>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> >> >>>>>> wrote: >> >>>>>>>> Hi Boyuan, >> >>>>>>>> >> >>>>>>>> This is Antonio. I reported the KafkaIO.read() performance issue >> on >> >>>>>> the >> >>>>>>>> slack channel a few days ago. >> >>>>>>>> >> >>>>>>>> I am not sure if this is helpful, but I have been doing some >> >>>>>> debugging on >> >>>>>>>> the SDK KafkaIO performance issue for our pipeline and I would >> like to >> >>>>>>>> provide some observations. >> >>>>>>>> >> >>>>>>>> It looks like in my case the ReadFromKafkaDoFn.processElement() >> was >> >>>>>>>> invoked within the same thread and every time >> kafaconsumer.poll() is >> >>>>>>>> called, it returns some records, from 1 up to 200 records. So, >> it will >> >>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() >> takes >> >>>>>> about >> >>>>>>>> 0.8ms. So, in this case, the polling and running of the pipeline >> are >> >>>>>>>> executed sequentially within a single thread. So, after >> processing a >> >>>>>> batch >> >>>>>>>> of records, it will need to wait for 0.8ms before it can process >> the >> >>>>>> next >> >>>>>>>> batch of records again. >> >>>>>>>> >> >>>>>>>> Any suggestions would be appreciated. >> >>>>>>>> >> >>>>>>>> Hope that helps. >> >>>>>>>> >> >>>>>>>> Thanks and regards, >> >>>>>>>> >> >>>>>>>> Antonio. >> >>>>>>>> >> >>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: >> >>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for >> >>>>>> tracking. >> >>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang < >> boyu...@google.com> >> >>>>>> wrote: >> >>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The execution >> >>>>>> paths >> >>>>>>>> for >> >>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly >> >>>>>> possible >> >>>>>>>> that >> >>>>>>>>>> the regression either comes from the invocation path for SDF >> >>>>>> wrapper, >> >>>>>>>> or >> >>>>>>>>>> the implementation of SDF wrapper itself. >> >>>>>>>>>> >> >>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz < >> sniem...@apache.org >> >>>>>>>> wrote: >> >>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1] >> >>>>>> yesterday >> >>>>>>>>>>> that they were seeing significantly reduced performance using >> >>>>>>>> KafkaIO.Read >> >>>>>>>>>>> w/ the SDF wrapper vs the unbounded source. They mentioned >> they >> >>>>>> were >> >>>>>>>> using >> >>>>>>>>>>> flink 1.9. >> >>>>>>>>>>> >> >>>>>>>>>>> >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang < >> boyu...@google.com> >> >>>>>>>> wrote: >> >>>>>>>>>>>> Hi Steve, >> >>>>>>>>>>>> >> >>>>>>>>>>>> I think the major performance regression comes from >> >>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which >> >>>>>> will >> >>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use >> >>>>>> timers/state >> >>>>>>>> to >> >>>>>>>>>>>> reschedule works. >> >>>>>>>>>>>> >> >>>>>>>>>>>> [1] >> >>>>>>>>>>>> >> >>>>>> >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >> >>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < >> >>>>>> sniem...@apache.org> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some >> >>>>>> aggregation, and >> >>>>>>>>>>>>> writes to various places. Previously, in older versions of >> >>>>>> beam, >> >>>>>>>> when >> >>>>>>>>>>>>> running this in the DirectRunner, messages would go through >> the >> >>>>>>>> pipeline >> >>>>>>>>>>>>> almost instantly, making it very easy to debug locally, etc. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that it >> could >> >>>>>> take >> >>>>>>>> on >> >>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the >> pubsub >> >>>>>> read >> >>>>>>>> step to >> >>>>>>>>>>>>> the next step in the pipeline (deserializing them, etc). >> The >> >>>>>>>> subscription >> >>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec >> >>>>>> arriving >> >>>>>>>> in it. >> >>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and >> makes >> >>>>>> the >> >>>>>>>>>>>>> pipeline behave as it did before. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> It seems like the SDF implementation in the DirectRunner >> here >> >>>>>> is >> >>>>>>>>>>>>> causing some kind of issue, either buffering a very large >> >>>>>> amount of >> >>>>>>>> data >> >>>>>>>>>>>>> before emitting it in a bundle, or something else. Has >> anyone >> >>>>>> else >> >>>>>>>> run >> >>>>>>>>>>>>> into this? >> >>>>>>>>>>>>> >> >