Hi, I proposed to make runner-issue checkpoint frequency configurable for a pipeline author here: https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing. I believe it will also be helpful for the performance issue. Please feel free to drop any comments there : )
On Wed, Jan 6, 2021 at 1:14 AM Jan Lukavský <je...@seznam.cz> wrote: > Sorry for the typo in your name. :-) > > On 1/6/21 10:11 AM, Jan Lukavský wrote: > > Hi Antonie, > > > > yes, for instance. I'd just like to rule out possibility that a single > > DoFn processing multiple partitions (restrictions) brings some > > overhead in your case. > > > > Jan > > > > On 12/31/20 10:36 PM, Antonio Si wrote: > >> Hi Jan, > >> > >> Sorry for the late reply. My topic has 180 partitions. Do you mean > >> run with a > >> parallelism set to 900? > >> > >> Thanks. > >> > >> Antonio. > >> > >> On 2020/12/23 20:30:34, Jan Lukavský <je...@seznam.cz> wrote: > >>> OK, > >>> > >>> could you make an experiment and increase the parallelism to something > >>> significantly higher than the total number of partitions? Say 5 times > >>> higher? Would that have impact on throughput in your case? > >>> > >>> Jan > >>> > >>> On 12/23/20 7:03 PM, Antonio Si wrote: > >>>> Hi Jan, > >>>> > >>>> The performance data that I reported was run with parallelism = 8. > >>>> We also ran with parallelism = 15 and we observed similar behaviors > >>>> although I don't have the exact numbers. I can get you the numbers > >>>> if needed. > >>>> > >>>> Regarding number of partitions, since we have multiple topics, the > >>>> number of partitions varies from 180 to 12. The highest TPS topic > >>>> has 180 partitions, while the lowest TPS topic has 12 partitions. > >>>> > >>>> Thanks. > >>>> > >>>> Antonio. > >>>> > >>>> On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote: > >>>>> Hi Antonio, > >>>>> > >>>>> can you please clarify a few things: > >>>>> > >>>>> a) what parallelism you use for your sources > >>>>> > >>>>> b) how many partitions there is in your topic(s) > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Jan > >>>>> > >>>>> On 12/22/20 10:07 PM, Antonio Si wrote: > >>>>>> Hi Boyuan, > >>>>>> > >>>>>> Let me clarify, I have tried with and without using > >>>>>> --experiments=beam_fn_api,use_sdf_kafka_read option: > >>>>>> > >>>>>> - with --experiments=use_deprecated_read --fasterrCopy=true, I > >>>>>> am able to achieve 13K TPS > >>>>>> - with --experiments="beam_fn_api,use_sdf_kafka_read" > >>>>>> --fasterCopy=true, I am able to achieve 10K > >>>>>> - with --fasterCopy=true alone, I am only able to achieve 5K TPS > >>>>>> > >>>>>> In our testcase, we have multiple topics, checkpoint intervals is > >>>>>> 60s. Some topics have a lot higher traffics than others. We look > >>>>>> at the case with --experiments="beam_fn_api,use_sdf_kafka_read" > >>>>>> --fasterCopy=true options a little. Based on our observation, > >>>>>> each consumer poll() in ReadFromKafkaDoFn.processElement() takes > >>>>>> about 0.8ms. So for topic with high traffics, it will continue in > >>>>>> the loop because every poll() will return some records. Every > >>>>>> poll returns about 200 records. So, it takes about 0.8ms for > >>>>>> every 200 records. I am not sure if that is part of the reason > >>>>>> for the performance. > >>>>>> > >>>>>> Thanks. > >>>>>> > >>>>>> Antonio. > >>>>>> > >>>>>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote: > >>>>>>> Hi Antonio, > >>>>>>> > >>>>>>> Thanks for the data point. That's very valuable information! > >>>>>>> > >>>>>>> I didn't use DirectRunner. I am using FlinkRunner. > >>>>>>>> We measured the number of Kafka messages that we can processed > >>>>>>>> per second. > >>>>>>>> With Beam v2.26 with --experiments=use_deprecated_read and > >>>>>>>> --fasterCopy=true, > >>>>>>>> we are able to consume 13K messages per second, but with Beam > >>>>>>>> v2.26 > >>>>>>>> without the use_deprecated_read option, we are only able to > >>>>>>>> process 10K > >>>>>>>> messages > >>>>>>>> per second for the same pipeline. > >>>>>>> We do have SDF implementation of Kafka Read instead of using the > >>>>>>> wrapper. > >>>>>>> Would you like to have a try to see whether it helps you improve > >>>>>>> your > >>>>>>> situation? You can use > >>>>>>> --experiments=beam_fn_api,use_sdf_kafka_read to > >>>>>>> switch to the Kafka SDF Read. > >>>>>>> > >>>>>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang > >>>>>>> <boyu...@google.com> wrote: > >>>>>>> > >>>>>>>> Hi Jan, > >>>>>>>>> it seems that what we would want is to couple the lifecycle of > >>>>>>>>> the Reader > >>>>>>>>> not with the restriction but with the particular instance of > >>>>>>>>> (Un)boundedSource (after being split). That could be done in > >>>>>>>>> the processing > >>>>>>>>> DoFn, if it contained a cache mapping instance of the source > >>>>>>>>> to the > >>>>>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we > >>>>>>>>> could assign > >>>>>>>>> (or create) the reader to the tracker, as the tracker is > >>>>>>>>> created for each > >>>>>>>>> restriction. > >>>>>>>>> > >>>>>>>>> WDYT? > >>>>>>>>> > >>>>>>>> I was thinking about this but it seems like it is not > >>>>>>>> applicable to the > >>>>>>>> way how UnboundedSource and UnboundedReader work together. > >>>>>>>> Please correct me if I'm wrong. The UnboundedReader is created > >>>>>>>> from > >>>>>>>> UnboundedSource per CheckpointMark[1], which means for certain > >>>>>>>> sources, the > >>>>>>>> CheckpointMark could affect some attributes like start position > >>>>>>>> of the > >>>>>>>> reader when resuming. So a single UnboundedSource could be > >>>>>>>> mapped to > >>>>>>>> multiple readers because of different instances of > >>>>>>>> CheckpointMarl. That's > >>>>>>>> also the reason why we use CheckpointMark as the restriction. > >>>>>>>> > >>>>>>>> Please let me know if I misunderstand your suggestion. > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si > >>>>>>>> <antonio...@gmail.com> wrote: > >>>>>>>> > >>>>>>>>> Hi Boyuan, > >>>>>>>>> > >>>>>>>>> Sorry for my late reply. I was off for a few days. > >>>>>>>>> > >>>>>>>>> I didn't use DirectRunner. I am using FlinkRunner. > >>>>>>>>> > >>>>>>>>> We measured the number of Kafka messages that we can processed > >>>>>>>>> per second. > >>>>>>>>> With Beam v2.26 with --experiments=use_deprecated_read and > >>>>>>>>> --fasterCopy=true, > >>>>>>>>> we are able to consume 13K messages per second, but with Beam > >>>>>>>>> v2.26 > >>>>>>>>> without the use_deprecated_read option, we are only able to > >>>>>>>>> process 10K > >>>>>>>>> messages > >>>>>>>>> per second for the same pipeline. > >>>>>>>>> > >>>>>>>>> Thanks and regards, > >>>>>>>>> > >>>>>>>>> Antonio. > >>>>>>>>> > >>>>>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: > >>>>>>>>>> Hi Antonio, > >>>>>>>>>> > >>>>>>>>>> Thanks for the details! Which version of Beam SDK are you > >>>>>>>>>> using? And are > >>>>>>>>>> you using --experiments=beam_fn_api with DirectRunner to > >>>>>>>>>> launch your > >>>>>>>>>> pipeline? > >>>>>>>>>> > >>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka > >>>>>>>>>> topic+partition as input element and a KafkaConsumer will be > >>>>>>>>>> assigned to > >>>>>>>>>> this topic+partition then poll records continuously. The > >>>>>>>>>> Kafka consumer > >>>>>>>>>> will resume reading and return from the process fn when > >>>>>>>>>> > >>>>>>>>>> - There are no available records currently(this is a > >>>>>>>>>> feature of SDF > >>>>>>>>>> which calls SDF self-initiated checkpoint) > >>>>>>>>>> - The > >>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker issues > >>>>>>>>>> checkpoint request to ReadFromKafkaDoFn for getting > >>>>>>>>>> partial results. > >>>>>>>>> The > >>>>>>>>>> checkpoint frequency for DirectRunner is every 100 > >>>>>>>>>> output records or > >>>>>>>>> every > >>>>>>>>>> 1 seconds. > >>>>>>>>>> > >>>>>>>>>> It seems like either the self-initiated checkpoint or > >>>>>>>>>> DirectRunner > >>>>>>>>> issued > >>>>>>>>>> checkpoint gives you the performance regression since there > >>>>>>>>>> is overhead > >>>>>>>>>> when rescheduling residuals. In your case, it's more like > >>>>>>>>>> that the > >>>>>>>>>> checkpoint behavior of > >>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker > >>>>>>>>>> gives you 200 elements a batch. I want to understand what > >>>>>>>>>> kind of > >>>>>>>>>> performance regression you are noticing? Is it slower to > >>>>>>>>>> output the same > >>>>>>>>>> amount of records? > >>>>>>>>>> > >>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si > >>>>>>>>>> <antonio...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>> Hi Boyuan, > >>>>>>>>>>> > >>>>>>>>>>> This is Antonio. I reported the KafkaIO.read() performance > >>>>>>>>>>> issue on > >>>>>>>>> the > >>>>>>>>>>> slack channel a few days ago. > >>>>>>>>>>> > >>>>>>>>>>> I am not sure if this is helpful, but I have been doing some > >>>>>>>>> debugging on > >>>>>>>>>>> the SDK KafkaIO performance issue for our pipeline and I > >>>>>>>>>>> would like to > >>>>>>>>>>> provide some observations. > >>>>>>>>>>> > >>>>>>>>>>> It looks like in my case the > >>>>>>>>>>> ReadFromKafkaDoFn.processElement() was > >>>>>>>>>>> invoked within the same thread and every time > >>>>>>>>>>> kafaconsumer.poll() is > >>>>>>>>>>> called, it returns some records, from 1 up to 200 records. > >>>>>>>>>>> So, it will > >>>>>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() > >>>>>>>>>>> takes > >>>>>>>>> about > >>>>>>>>>>> 0.8ms. So, in this case, the polling and running of the > >>>>>>>>>>> pipeline are > >>>>>>>>>>> executed sequentially within a single thread. So, after > >>>>>>>>>>> processing a > >>>>>>>>> batch > >>>>>>>>>>> of records, it will need to wait for 0.8ms before it can > >>>>>>>>>>> process the > >>>>>>>>> next > >>>>>>>>>>> batch of records again. > >>>>>>>>>>> > >>>>>>>>>>> Any suggestions would be appreciated. > >>>>>>>>>>> > >>>>>>>>>>> Hope that helps. > >>>>>>>>>>> > >>>>>>>>>>> Thanks and regards, > >>>>>>>>>>> > >>>>>>>>>>> Antonio. > >>>>>>>>>>> > >>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for > >>>>>>>>> tracking. > >>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang > >>>>>>>>>>>> <boyu...@google.com> > >>>>>>>>> wrote: > >>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The > >>>>>>>>>>>>> execution > >>>>>>>>> paths > >>>>>>>>>>> for > >>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly > >>>>>>>>> possible > >>>>>>>>>>> that > >>>>>>>>>>>>> the regression either comes from the invocation path for SDF > >>>>>>>>> wrapper, > >>>>>>>>>>> or > >>>>>>>>>>>>> the implementation of SDF wrapper itself. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz > >>>>>>>>>>>>> <sniem...@apache.org > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1] > >>>>>>>>> yesterday > >>>>>>>>>>>>>> that they were seeing significantly reduced performance > >>>>>>>>>>>>>> using > >>>>>>>>>>> KafkaIO.Read > >>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source. They > >>>>>>>>>>>>>> mentioned they > >>>>>>>>> were > >>>>>>>>>>> using > >>>>>>>>>>>>>> flink 1.9. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang > >>>>>>>>>>>>>> <boyu...@google.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> Hi Steve, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think the major performance regression comes from > >>>>>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], > >>>>>>>>>>>>>>> which > >>>>>>>>> will > >>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use > >>>>>>>>> timers/state > >>>>>>>>>>> to > >>>>>>>>>>>>>>> reschedule works. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> > >>>>>>>>> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java > >>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < > >>>>>>>>> sniem...@apache.org> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some > >>>>>>>>> aggregation, and > >>>>>>>>>>>>>>>> writes to various places. Previously, in older > >>>>>>>>>>>>>>>> versions of > >>>>>>>>> beam, > >>>>>>>>>>> when > >>>>>>>>>>>>>>>> running this in the DirectRunner, messages would go > >>>>>>>>>>>>>>>> through the > >>>>>>>>>>> pipeline > >>>>>>>>>>>>>>>> almost instantly, making it very easy to debug locally, > >>>>>>>>>>>>>>>> etc. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that > >>>>>>>>>>>>>>>> it could > >>>>>>>>> take > >>>>>>>>>>> on > >>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the > >>>>>>>>>>>>>>>> pubsub > >>>>>>>>> read > >>>>>>>>>>> step to > >>>>>>>>>>>>>>>> the next step in the pipeline (deserializing them, > >>>>>>>>>>>>>>>> etc). The > >>>>>>>>>>> subscription > >>>>>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec > >>>>>>>>> arriving > >>>>>>>>>>> in it. > >>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and > >>>>>>>>>>>>>>>> makes > >>>>>>>>> the > >>>>>>>>>>>>>>>> pipeline behave as it did before. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> It seems like the SDF implementation in the > >>>>>>>>>>>>>>>> DirectRunner here > >>>>>>>>> is > >>>>>>>>>>>>>>>> causing some kind of issue, either buffering a very large > >>>>>>>>> amount of > >>>>>>>>>>> data > >>>>>>>>>>>>>>>> before emitting it in a bundle, or something else. Has > >>>>>>>>>>>>>>>> anyone > >>>>>>>>> else > >>>>>>>>>>> run > >>>>>>>>>>>>>>>> into this? > >>>>>>>>>>>>>>>> >