Hi Jan, Sorry for the late reply. My topic has 180 partitions. Do you mean run with a parallelism set to 900?
Thanks. Antonio. On 2020/12/23 20:30:34, Jan Lukavský <je...@seznam.cz> wrote: > OK, > > could you make an experiment and increase the parallelism to something > significantly higher than the total number of partitions? Say 5 times > higher? Would that have impact on throughput in your case? > > Jan > > On 12/23/20 7:03 PM, Antonio Si wrote: > > Hi Jan, > > > > The performance data that I reported was run with parallelism = 8. We also > > ran with parallelism = 15 and we observed similar behaviors although I > > don't have the exact numbers. I can get you the numbers if needed. > > > > Regarding number of partitions, since we have multiple topics, the number > > of partitions varies from 180 to 12. The highest TPS topic has 180 > > partitions, while the lowest TPS topic has 12 partitions. > > > > Thanks. > > > > Antonio. > > > > On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Antonio, > >> > >> can you please clarify a few things: > >> > >> a) what parallelism you use for your sources > >> > >> b) how many partitions there is in your topic(s) > >> > >> Thanks, > >> > >> Jan > >> > >> On 12/22/20 10:07 PM, Antonio Si wrote: > >>> Hi Boyuan, > >>> > >>> Let me clarify, I have tried with and without using > >>> --experiments=beam_fn_api,use_sdf_kafka_read option: > >>> > >>> - with --experiments=use_deprecated_read --fasterrCopy=true, I am able > >>> to achieve 13K TPS > >>> - with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, > >>> I am able to achieve 10K > >>> - with --fasterCopy=true alone, I am only able to achieve 5K TPS > >>> > >>> In our testcase, we have multiple topics, checkpoint intervals is 60s. > >>> Some topics have a lot higher traffics than others. We look at the case > >>> with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true > >>> options a little. Based on our observation, each consumer poll() in > >>> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with > >>> high traffics, it will continue in the loop because every poll() will > >>> return some records. Every poll returns about 200 records. So, it takes > >>> about 0.8ms for every 200 records. I am not sure if that is part of the > >>> reason for the performance. > >>> > >>> Thanks. > >>> > >>> Antonio. > >>> > >>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote: > >>>> Hi Antonio, > >>>> > >>>> Thanks for the data point. That's very valuable information! > >>>> > >>>> I didn't use DirectRunner. I am using FlinkRunner. > >>>>> We measured the number of Kafka messages that we can processed per > >>>>> second. > >>>>> With Beam v2.26 with --experiments=use_deprecated_read and > >>>>> --fasterCopy=true, > >>>>> we are able to consume 13K messages per second, but with Beam v2.26 > >>>>> without the use_deprecated_read option, we are only able to process 10K > >>>>> messages > >>>>> per second for the same pipeline. > >>>> We do have SDF implementation of Kafka Read instead of using the wrapper. > >>>> Would you like to have a try to see whether it helps you improve your > >>>> situation? You can use --experiments=beam_fn_api,use_sdf_kafka_read to > >>>> switch to the Kafka SDF Read. > >>>> > >>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang <boyu...@google.com> wrote: > >>>> > >>>>> Hi Jan, > >>>>>> it seems that what we would want is to couple the lifecycle of the > >>>>>> Reader > >>>>>> not with the restriction but with the particular instance of > >>>>>> (Un)boundedSource (after being split). That could be done in the > >>>>>> processing > >>>>>> DoFn, if it contained a cache mapping instance of the source to the > >>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could > >>>>>> assign > >>>>>> (or create) the reader to the tracker, as the tracker is created for > >>>>>> each > >>>>>> restriction. > >>>>>> > >>>>>> WDYT? > >>>>>> > >>>>> I was thinking about this but it seems like it is not applicable to the > >>>>> way how UnboundedSource and UnboundedReader work together. > >>>>> Please correct me if I'm wrong. The UnboundedReader is created from > >>>>> UnboundedSource per CheckpointMark[1], which means for certain sources, > >>>>> the > >>>>> CheckpointMark could affect some attributes like start position of the > >>>>> reader when resuming. So a single UnboundedSource could be mapped to > >>>>> multiple readers because of different instances of CheckpointMarl. > >>>>> That's > >>>>> also the reason why we use CheckpointMark as the restriction. > >>>>> > >>>>> Please let me know if I misunderstand your suggestion. > >>>>> > >>>>> [1] > >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 > >>>>> > >>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com> wrote: > >>>>> > >>>>>> Hi Boyuan, > >>>>>> > >>>>>> Sorry for my late reply. I was off for a few days. > >>>>>> > >>>>>> I didn't use DirectRunner. I am using FlinkRunner. > >>>>>> > >>>>>> We measured the number of Kafka messages that we can processed per > >>>>>> second. > >>>>>> With Beam v2.26 with --experiments=use_deprecated_read and > >>>>>> --fasterCopy=true, > >>>>>> we are able to consume 13K messages per second, but with Beam v2.26 > >>>>>> without the use_deprecated_read option, we are only able to process 10K > >>>>>> messages > >>>>>> per second for the same pipeline. > >>>>>> > >>>>>> Thanks and regards, > >>>>>> > >>>>>> Antonio. > >>>>>> > >>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: > >>>>>>> Hi Antonio, > >>>>>>> > >>>>>>> Thanks for the details! Which version of Beam SDK are you using? And > >>>>>>> are > >>>>>>> you using --experiments=beam_fn_api with DirectRunner to launch your > >>>>>>> pipeline? > >>>>>>> > >>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka > >>>>>>> topic+partition as input element and a KafkaConsumer will be assigned > >>>>>>> to > >>>>>>> this topic+partition then poll records continuously. The Kafka > >>>>>>> consumer > >>>>>>> will resume reading and return from the process fn when > >>>>>>> > >>>>>>> - There are no available records currently(this is a feature of > >>>>>>> SDF > >>>>>>> which calls SDF self-initiated checkpoint) > >>>>>>> - The OutputAndTimeBoundedSplittableProcessElementInvoker issues > >>>>>>> checkpoint request to ReadFromKafkaDoFn for getting partial > >>>>>>> results. > >>>>>> The > >>>>>>> checkpoint frequency for DirectRunner is every 100 output > >>>>>>> records or > >>>>>> every > >>>>>>> 1 seconds. > >>>>>>> > >>>>>>> It seems like either the self-initiated checkpoint or DirectRunner > >>>>>> issued > >>>>>>> checkpoint gives you the performance regression since there is > >>>>>>> overhead > >>>>>>> when rescheduling residuals. In your case, it's more like that the > >>>>>>> checkpoint behavior of > >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker > >>>>>>> gives you 200 elements a batch. I want to understand what kind of > >>>>>>> performance regression you are noticing? Is it slower to output the > >>>>>>> same > >>>>>>> amount of records? > >>>>>>> > >>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> > >>>>>> wrote: > >>>>>>>> Hi Boyuan, > >>>>>>>> > >>>>>>>> This is Antonio. I reported the KafkaIO.read() performance issue on > >>>>>> the > >>>>>>>> slack channel a few days ago. > >>>>>>>> > >>>>>>>> I am not sure if this is helpful, but I have been doing some > >>>>>> debugging on > >>>>>>>> the SDK KafkaIO performance issue for our pipeline and I would like > >>>>>>>> to > >>>>>>>> provide some observations. > >>>>>>>> > >>>>>>>> It looks like in my case the ReadFromKafkaDoFn.processElement() was > >>>>>>>> invoked within the same thread and every time kafaconsumer.poll() is > >>>>>>>> called, it returns some records, from 1 up to 200 records. So, it > >>>>>>>> will > >>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes > >>>>>> about > >>>>>>>> 0.8ms. So, in this case, the polling and running of the pipeline are > >>>>>>>> executed sequentially within a single thread. So, after processing a > >>>>>> batch > >>>>>>>> of records, it will need to wait for 0.8ms before it can process the > >>>>>> next > >>>>>>>> batch of records again. > >>>>>>>> > >>>>>>>> Any suggestions would be appreciated. > >>>>>>>> > >>>>>>>> Hope that helps. > >>>>>>>> > >>>>>>>> Thanks and regards, > >>>>>>>> > >>>>>>>> Antonio. > >>>>>>>> > >>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: > >>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for > >>>>>> tracking. > >>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> > >>>>>> wrote: > >>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The execution > >>>>>> paths > >>>>>>>> for > >>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly > >>>>>> possible > >>>>>>>> that > >>>>>>>>>> the regression either comes from the invocation path for SDF > >>>>>> wrapper, > >>>>>>>> or > >>>>>>>>>> the implementation of SDF wrapper itself. > >>>>>>>>>> > >>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org > >>>>>>>> wrote: > >>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1] > >>>>>> yesterday > >>>>>>>>>>> that they were seeing significantly reduced performance using > >>>>>>>> KafkaIO.Read > >>>>>>>>>>> w/ the SDF wrapper vs the unbounded source. They mentioned they > >>>>>> were > >>>>>>>> using > >>>>>>>>>>> flink 1.9. > >>>>>>>>>>> > >>>>>>>>>>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> > >>>>>>>> wrote: > >>>>>>>>>>>> Hi Steve, > >>>>>>>>>>>> > >>>>>>>>>>>> I think the major performance regression comes from > >>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which > >>>>>> will > >>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use > >>>>>> timers/state > >>>>>>>> to > >>>>>>>>>>>> reschedule works. > >>>>>>>>>>>> > >>>>>>>>>>>> [1] > >>>>>>>>>>>> > >>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java > >>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < > >>>>>> sniem...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some > >>>>>> aggregation, and > >>>>>>>>>>>>> writes to various places. Previously, in older versions of > >>>>>> beam, > >>>>>>>> when > >>>>>>>>>>>>> running this in the DirectRunner, messages would go through the > >>>>>>>> pipeline > >>>>>>>>>>>>> almost instantly, making it very easy to debug locally, etc. > >>>>>>>>>>>>> > >>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that it could > >>>>>> take > >>>>>>>> on > >>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the pubsub > >>>>>> read > >>>>>>>> step to > >>>>>>>>>>>>> the next step in the pipeline (deserializing them, etc). The > >>>>>>>> subscription > >>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec > >>>>>> arriving > >>>>>>>> in it. > >>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and makes > >>>>>> the > >>>>>>>>>>>>> pipeline behave as it did before. > >>>>>>>>>>>>> > >>>>>>>>>>>>> It seems like the SDF implementation in the DirectRunner here > >>>>>> is > >>>>>>>>>>>>> causing some kind of issue, either buffering a very large > >>>>>> amount of > >>>>>>>> data > >>>>>>>>>>>>> before emitting it in a bundle, or something else. Has anyone > >>>>>> else > >>>>>>>> run > >>>>>>>>>>>>> into this? > >>>>>>>>>>>>> >