Hi Jan,

Sorry for the late reply. My topic has 180 partitions. Do you mean run with a
parallelism set to 900?

Thanks.

Antonio.

On 2020/12/23 20:30:34, Jan Lukavský <je...@seznam.cz> wrote: 
> OK,
> 
> could you make an experiment and increase the parallelism to something 
> significantly higher than the total number of partitions? Say 5 times 
> higher? Would that have impact on throughput in your case?
> 
> Jan
> 
> On 12/23/20 7:03 PM, Antonio Si wrote:
> > Hi Jan,
> >
> > The performance data that I reported was run with parallelism = 8. We also 
> > ran with parallelism = 15 and we observed similar behaviors although I 
> > don't have the exact numbers. I can get you the numbers if needed.
> >
> > Regarding number of partitions, since we have multiple topics, the number 
> > of partitions varies from 180 to 12. The highest TPS topic has 180 
> > partitions, while the lowest TPS topic has 12 partitions.
> >
> > Thanks.
> >
> > Antonio.
> >
> > On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote:
> >> Hi Antonio,
> >>
> >> can you please clarify a few things:
> >>
> >>    a) what parallelism you use for your sources
> >>
> >>    b) how many partitions there is in your topic(s)
> >>
> >> Thanks,
> >>
> >>    Jan
> >>
> >> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>> Hi Boyuan,
> >>>
> >>> Let me clarify, I have tried with and without using 
> >>> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>
> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am able 
> >>> to achieve 13K TPS
> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, 
> >>> I am able to achieve 10K
> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>
> >>> In our testcase, we have multiple topics, checkpoint intervals is 60s. 
> >>> Some topics have a lot higher traffics than others. We look at the case 
> >>> with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true 
> >>> options a little. Based on our observation, each consumer poll() in 
> >>> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with 
> >>> high traffics, it will continue in the loop because every poll() will 
> >>> return some records. Every poll returns about 200 records. So, it takes 
> >>> about 0.8ms for every 200 records. I am not sure if that is part of the 
> >>> reason for the performance.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote:
> >>>> Hi Antonio,
> >>>>
> >>>> Thanks for the data point. That's very valuable information!
> >>>>
> >>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>> We measured the number of Kafka messages that we can processed per 
> >>>>> second.
> >>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>> --fasterCopy=true,
> >>>>> we are able to consume 13K messages per second, but with Beam v2.26
> >>>>> without the use_deprecated_read option, we are only able to process 10K
> >>>>> messages
> >>>>> per second for the same pipeline.
> >>>> We do have SDF implementation of Kafka Read instead of using the wrapper.
> >>>> Would you like to have a try to see whether it helps you improve your
> >>>> situation?  You can use --experiments=beam_fn_api,use_sdf_kafka_read to
> >>>> switch to the Kafka SDF Read.
> >>>>
> >>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang <boyu...@google.com> wrote:
> >>>>
> >>>>> Hi Jan,
> >>>>>> it seems that what we would want is to couple the lifecycle of the 
> >>>>>> Reader
> >>>>>> not with the restriction but with the particular instance of
> >>>>>> (Un)boundedSource (after being split). That could be done in the 
> >>>>>> processing
> >>>>>> DoFn, if it contained a cache mapping instance of the source to the
> >>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could 
> >>>>>> assign
> >>>>>> (or create) the reader to the tracker, as the tracker is created for 
> >>>>>> each
> >>>>>> restriction.
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>> I was thinking about this but it seems like it is not applicable to the
> >>>>> way how UnboundedSource and UnboundedReader work together.
> >>>>> Please correct me if I'm wrong. The UnboundedReader is created from
> >>>>> UnboundedSource per CheckpointMark[1], which means for certain sources, 
> >>>>> the
> >>>>> CheckpointMark could affect some attributes like start position of the
> >>>>> reader when resuming. So a single UnboundedSource could be mapped to
> >>>>> multiple readers because of different instances of CheckpointMarl. 
> >>>>> That's
> >>>>> also the reason why we use CheckpointMark as the restriction.
> >>>>>
> >>>>> Please let me know if I misunderstand your suggestion.
> >>>>>
> >>>>> [1]
> >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
> >>>>>
> >>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi Boyuan,
> >>>>>>
> >>>>>> Sorry for my late reply. I was off for a few days.
> >>>>>>
> >>>>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>>>
> >>>>>> We measured the number of Kafka messages that we can processed per 
> >>>>>> second.
> >>>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>>> --fasterCopy=true,
> >>>>>> we are able to consume 13K messages per second, but with Beam v2.26
> >>>>>> without the use_deprecated_read option, we are only able to process 10K
> >>>>>> messages
> >>>>>> per second for the same pipeline.
> >>>>>>
> >>>>>> Thanks and regards,
> >>>>>>
> >>>>>> Antonio.
> >>>>>>
> >>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote:
> >>>>>>> Hi Antonio,
> >>>>>>>
> >>>>>>> Thanks for the details! Which version of Beam SDK are you using? And 
> >>>>>>> are
> >>>>>>> you using --experiments=beam_fn_api with DirectRunner to launch your
> >>>>>>> pipeline?
> >>>>>>>
> >>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> >>>>>>> topic+partition as input element and a KafkaConsumer will be assigned 
> >>>>>>> to
> >>>>>>> this topic+partition then poll records continuously. The Kafka 
> >>>>>>> consumer
> >>>>>>> will resume reading and return from the process fn when
> >>>>>>>
> >>>>>>>      - There are no available records currently(this is a feature of 
> >>>>>>> SDF
> >>>>>>>      which calls SDF self-initiated checkpoint)
> >>>>>>>      - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
> >>>>>>>      checkpoint request to ReadFromKafkaDoFn for getting partial 
> >>>>>>> results.
> >>>>>> The
> >>>>>>>      checkpoint frequency for DirectRunner is every 100 output 
> >>>>>>> records or
> >>>>>> every
> >>>>>>>      1 seconds.
> >>>>>>>
> >>>>>>> It seems like either the self-initiated checkpoint or DirectRunner
> >>>>>> issued
> >>>>>>> checkpoint gives you the performance regression since there is 
> >>>>>>> overhead
> >>>>>>> when rescheduling residuals. In your case, it's more like that the
> >>>>>>> checkpoint behavior of
> >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker
> >>>>>>> gives you 200 elements a batch. I want to understand what kind of
> >>>>>>> performance regression you are noticing? Is it slower to output the 
> >>>>>>> same
> >>>>>>> amount of records?
> >>>>>>>
> >>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com>
> >>>>>> wrote:
> >>>>>>>> Hi Boyuan,
> >>>>>>>>
> >>>>>>>> This is Antonio. I reported the KafkaIO.read() performance issue on
> >>>>>> the
> >>>>>>>> slack channel a few days ago.
> >>>>>>>>
> >>>>>>>> I am not sure if this is helpful, but I have been doing some
> >>>>>> debugging on
> >>>>>>>> the SDK KafkaIO performance issue for our pipeline and I would like 
> >>>>>>>> to
> >>>>>>>> provide some observations.
> >>>>>>>>
> >>>>>>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
> >>>>>>>> invoked within the same thread and every time kafaconsumer.poll() is
> >>>>>>>> called, it returns some records, from 1 up to 200 records. So, it 
> >>>>>>>> will
> >>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes
> >>>>>> about
> >>>>>>>> 0.8ms. So, in this case, the polling and running of the pipeline are
> >>>>>>>> executed sequentially within a single thread. So, after processing a
> >>>>>> batch
> >>>>>>>> of records, it will need to wait for 0.8ms before it can process the
> >>>>>> next
> >>>>>>>> batch of records again.
> >>>>>>>>
> >>>>>>>> Any suggestions would be appreciated.
> >>>>>>>>
> >>>>>>>> Hope that helps.
> >>>>>>>>
> >>>>>>>> Thanks and regards,
> >>>>>>>>
> >>>>>>>> Antonio.
> >>>>>>>>
> >>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
> >>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for
> >>>>>> tracking.
> >>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com>
> >>>>>> wrote:
> >>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The execution
> >>>>>> paths
> >>>>>>>> for
> >>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly
> >>>>>> possible
> >>>>>>>> that
> >>>>>>>>>> the regression either comes from the invocation path for SDF
> >>>>>> wrapper,
> >>>>>>>> or
> >>>>>>>>>> the implementation of SDF wrapper itself.
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org
> >>>>>>>> wrote:
> >>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1]
> >>>>>> yesterday
> >>>>>>>>>>> that they were seeing significantly reduced performance using
> >>>>>>>> KafkaIO.Read
> >>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.  They mentioned they
> >>>>>> were
> >>>>>>>> using
> >>>>>>>>>>> flink 1.9.
> >>>>>>>>>>>
> >>>>>>>>>>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>> Hi Steve,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think the major performance regression comes from
> >>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which
> >>>>>> will
> >>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use
> >>>>>> timers/state
> >>>>>>>> to
> >>>>>>>>>>>> reschedule works.
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> >>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
> >>>>>> sniem...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some
> >>>>>> aggregation, and
> >>>>>>>>>>>>> writes to various places.  Previously, in older versions of
> >>>>>> beam,
> >>>>>>>> when
> >>>>>>>>>>>>> running this in the DirectRunner, messages would go through the
> >>>>>>>> pipeline
> >>>>>>>>>>>>> almost instantly, making it very easy to debug locally, etc.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that it could
> >>>>>> take
> >>>>>>>> on
> >>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the pubsub
> >>>>>> read
> >>>>>>>> step to
> >>>>>>>>>>>>> the next step in the pipeline (deserializing them, etc).  The
> >>>>>>>> subscription
> >>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec
> >>>>>> arriving
> >>>>>>>> in it.
> >>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and makes
> >>>>>> the
> >>>>>>>>>>>>> pipeline behave as it did before.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It seems like the SDF implementation in the DirectRunner here
> >>>>>> is
> >>>>>>>>>>>>> causing some kind of issue, either buffering a very large
> >>>>>> amount of
> >>>>>>>> data
> >>>>>>>>>>>>> before emitting it in a bundle, or something else.  Has anyone
> >>>>>> else
> >>>>>>>> run
> >>>>>>>>>>>>> into this?
> >>>>>>>>>>>>>
> 

Reply via email to