Hi,

I proposed to make runner-issue checkpoint frequency configurable for a
pipeline author here:
https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing.
I believe it will also be helpful for the performance issue. Please feel
free to drop any comments there : )

On Wed, Jan 6, 2021 at 1:14 AM Jan Lukavský <je...@seznam.cz> wrote:

> Sorry for the typo in your name. :-)
>
> On 1/6/21 10:11 AM, Jan Lukavský wrote:
> > Hi Antonie,
> >
> > yes, for instance. I'd just like to rule out possibility that a single
> > DoFn processing multiple partitions (restrictions) brings some
> > overhead in your case.
> >
> > Jan
> >
> > On 12/31/20 10:36 PM, Antonio Si wrote:
> >> Hi Jan,
> >>
> >> Sorry for the late reply. My topic has 180 partitions. Do you mean
> >> run with a
> >> parallelism set to 900?
> >>
> >> Thanks.
> >>
> >> Antonio.
> >>
> >> On 2020/12/23 20:30:34, Jan Lukavský <je...@seznam.cz> wrote:
> >>> OK,
> >>>
> >>> could you make an experiment and increase the parallelism to something
> >>> significantly higher than the total number of partitions? Say 5 times
> >>> higher? Would that have impact on throughput in your case?
> >>>
> >>> Jan
> >>>
> >>> On 12/23/20 7:03 PM, Antonio Si wrote:
> >>>> Hi Jan,
> >>>>
> >>>> The performance data that I reported was run with parallelism = 8.
> >>>> We also ran with parallelism = 15 and we observed similar behaviors
> >>>> although I don't have the exact numbers. I can get you the numbers
> >>>> if needed.
> >>>>
> >>>> Regarding number of partitions, since we have multiple topics, the
> >>>> number of partitions varies from 180 to 12. The highest TPS topic
> >>>> has 180 partitions, while the lowest TPS topic has 12 partitions.
> >>>>
> >>>> Thanks.
> >>>>
> >>>> Antonio.
> >>>>
> >>>> On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>> Hi Antonio,
> >>>>>
> >>>>> can you please clarify a few things:
> >>>>>
> >>>>>     a) what parallelism you use for your sources
> >>>>>
> >>>>>     b) how many partitions there is in your topic(s)
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>>     Jan
> >>>>>
> >>>>> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>>>>> Hi Boyuan,
> >>>>>>
> >>>>>> Let me clarify, I have tried with and without using
> >>>>>> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>>>>
> >>>>>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I
> >>>>>> am able to achieve 13K TPS
> >>>>>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
> >>>>>> --fasterCopy=true, I am able to achieve 10K
> >>>>>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>>>>
> >>>>>> In our testcase, we have multiple topics, checkpoint intervals is
> >>>>>> 60s. Some topics have a lot higher traffics than others. We look
> >>>>>> at the case with --experiments="beam_fn_api,use_sdf_kafka_read"
> >>>>>> --fasterCopy=true options a little. Based on our observation,
> >>>>>> each consumer poll() in ReadFromKafkaDoFn.processElement() takes
> >>>>>> about 0.8ms. So for topic with high traffics, it will continue in
> >>>>>> the loop because every poll() will return some records. Every
> >>>>>> poll returns about 200 records. So, it takes about 0.8ms for
> >>>>>> every 200 records. I am not sure if that is part of the reason
> >>>>>> for the performance.
> >>>>>>
> >>>>>> Thanks.
> >>>>>>
> >>>>>> Antonio.
> >>>>>>
> >>>>>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote:
> >>>>>>> Hi Antonio,
> >>>>>>>
> >>>>>>> Thanks for the data point. That's very valuable information!
> >>>>>>>
> >>>>>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>>>>> We measured the number of Kafka messages that we can processed
> >>>>>>>> per second.
> >>>>>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>>>>> --fasterCopy=true,
> >>>>>>>> we are able to consume 13K messages per second, but with Beam
> >>>>>>>> v2.26
> >>>>>>>> without the use_deprecated_read option, we are only able to
> >>>>>>>> process 10K
> >>>>>>>> messages
> >>>>>>>> per second for the same pipeline.
> >>>>>>> We do have SDF implementation of Kafka Read instead of using the
> >>>>>>> wrapper.
> >>>>>>> Would you like to have a try to see whether it helps you improve
> >>>>>>> your
> >>>>>>> situation?  You can use
> >>>>>>> --experiments=beam_fn_api,use_sdf_kafka_read to
> >>>>>>> switch to the Kafka SDF Read.
> >>>>>>>
> >>>>>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang
> >>>>>>> <boyu...@google.com> wrote:
> >>>>>>>
> >>>>>>>> Hi Jan,
> >>>>>>>>> it seems that what we would want is to couple the lifecycle of
> >>>>>>>>> the Reader
> >>>>>>>>> not with the restriction but with the particular instance of
> >>>>>>>>> (Un)boundedSource (after being split). That could be done in
> >>>>>>>>> the processing
> >>>>>>>>> DoFn, if it contained a cache mapping instance of the source
> >>>>>>>>> to the
> >>>>>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we
> >>>>>>>>> could assign
> >>>>>>>>> (or create) the reader to the tracker, as the tracker is
> >>>>>>>>> created for each
> >>>>>>>>> restriction.
> >>>>>>>>>
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>> I was thinking about this but it seems like it is not
> >>>>>>>> applicable to the
> >>>>>>>> way how UnboundedSource and UnboundedReader work together.
> >>>>>>>> Please correct me if I'm wrong. The UnboundedReader is created
> >>>>>>>> from
> >>>>>>>> UnboundedSource per CheckpointMark[1], which means for certain
> >>>>>>>> sources, the
> >>>>>>>> CheckpointMark could affect some attributes like start position
> >>>>>>>> of the
> >>>>>>>> reader when resuming. So a single UnboundedSource could be
> >>>>>>>> mapped to
> >>>>>>>> multiple readers because of different instances of
> >>>>>>>> CheckpointMarl. That's
> >>>>>>>> also the reason why we use CheckpointMark as the restriction.
> >>>>>>>>
> >>>>>>>> Please let me know if I misunderstand your suggestion.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si
> >>>>>>>> <antonio...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Boyuan,
> >>>>>>>>>
> >>>>>>>>> Sorry for my late reply. I was off for a few days.
> >>>>>>>>>
> >>>>>>>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>>>>>>
> >>>>>>>>> We measured the number of Kafka messages that we can processed
> >>>>>>>>> per second.
> >>>>>>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>>>>>> --fasterCopy=true,
> >>>>>>>>> we are able to consume 13K messages per second, but with Beam
> >>>>>>>>> v2.26
> >>>>>>>>> without the use_deprecated_read option, we are only able to
> >>>>>>>>> process 10K
> >>>>>>>>> messages
> >>>>>>>>> per second for the same pipeline.
> >>>>>>>>>
> >>>>>>>>> Thanks and regards,
> >>>>>>>>>
> >>>>>>>>> Antonio.
> >>>>>>>>>
> >>>>>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote:
> >>>>>>>>>> Hi Antonio,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the details! Which version of Beam SDK are you
> >>>>>>>>>> using? And are
> >>>>>>>>>> you using --experiments=beam_fn_api with DirectRunner to
> >>>>>>>>>> launch your
> >>>>>>>>>> pipeline?
> >>>>>>>>>>
> >>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> >>>>>>>>>> topic+partition as input element and a KafkaConsumer will be
> >>>>>>>>>> assigned to
> >>>>>>>>>> this topic+partition then poll records continuously. The
> >>>>>>>>>> Kafka consumer
> >>>>>>>>>> will resume reading and return from the process fn when
> >>>>>>>>>>
> >>>>>>>>>>       - There are no available records currently(this is a
> >>>>>>>>>> feature of SDF
> >>>>>>>>>>       which calls SDF self-initiated checkpoint)
> >>>>>>>>>>       - The
> >>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker issues
> >>>>>>>>>>       checkpoint request to ReadFromKafkaDoFn for getting
> >>>>>>>>>> partial results.
> >>>>>>>>> The
> >>>>>>>>>>       checkpoint frequency for DirectRunner is every 100
> >>>>>>>>>> output records or
> >>>>>>>>> every
> >>>>>>>>>>       1 seconds.
> >>>>>>>>>>
> >>>>>>>>>> It seems like either the self-initiated checkpoint or
> >>>>>>>>>> DirectRunner
> >>>>>>>>> issued
> >>>>>>>>>> checkpoint gives you the performance regression since there
> >>>>>>>>>> is overhead
> >>>>>>>>>> when rescheduling residuals. In your case, it's more like
> >>>>>>>>>> that the
> >>>>>>>>>> checkpoint behavior of
> >>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker
> >>>>>>>>>> gives you 200 elements a batch. I want to understand what
> >>>>>>>>>> kind of
> >>>>>>>>>> performance regression you are noticing? Is it slower to
> >>>>>>>>>> output the same
> >>>>>>>>>> amount of records?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
> >>>>>>>>>> <antonio...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>> Hi Boyuan,
> >>>>>>>>>>>
> >>>>>>>>>>> This is Antonio. I reported the KafkaIO.read() performance
> >>>>>>>>>>> issue on
> >>>>>>>>> the
> >>>>>>>>>>> slack channel a few days ago.
> >>>>>>>>>>>
> >>>>>>>>>>> I am not sure if this is helpful, but I have been doing some
> >>>>>>>>> debugging on
> >>>>>>>>>>> the SDK KafkaIO performance issue for our pipeline and I
> >>>>>>>>>>> would like to
> >>>>>>>>>>> provide some observations.
> >>>>>>>>>>>
> >>>>>>>>>>> It looks like in my case the
> >>>>>>>>>>> ReadFromKafkaDoFn.processElement()  was
> >>>>>>>>>>> invoked within the same thread and every time
> >>>>>>>>>>> kafaconsumer.poll() is
> >>>>>>>>>>> called, it returns some records, from 1 up to 200 records.
> >>>>>>>>>>> So, it will
> >>>>>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll()
> >>>>>>>>>>> takes
> >>>>>>>>> about
> >>>>>>>>>>> 0.8ms. So, in this case, the polling and running of the
> >>>>>>>>>>> pipeline are
> >>>>>>>>>>> executed sequentially within a single thread. So, after
> >>>>>>>>>>> processing a
> >>>>>>>>> batch
> >>>>>>>>>>> of records, it will need to wait for 0.8ms before it can
> >>>>>>>>>>> process the
> >>>>>>>>> next
> >>>>>>>>>>> batch of records again.
> >>>>>>>>>>>
> >>>>>>>>>>> Any suggestions would be appreciated.
> >>>>>>>>>>>
> >>>>>>>>>>> Hope that helps.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks and regards,
> >>>>>>>>>>>
> >>>>>>>>>>> Antonio.
> >>>>>>>>>>>
> >>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for
> >>>>>>>>> tracking.
> >>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
> >>>>>>>>>>>> <boyu...@google.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The
> >>>>>>>>>>>>> execution
> >>>>>>>>> paths
> >>>>>>>>>>> for
> >>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly
> >>>>>>>>> possible
> >>>>>>>>>>> that
> >>>>>>>>>>>>> the regression either comes from the invocation path for SDF
> >>>>>>>>> wrapper,
> >>>>>>>>>>> or
> >>>>>>>>>>>>> the implementation of SDF wrapper itself.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
> >>>>>>>>>>>>> <sniem...@apache.org
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1]
> >>>>>>>>> yesterday
> >>>>>>>>>>>>>> that they were seeing significantly reduced performance
> >>>>>>>>>>>>>> using
> >>>>>>>>>>> KafkaIO.Read
> >>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.  They
> >>>>>>>>>>>>>> mentioned they
> >>>>>>>>> were
> >>>>>>>>>>> using
> >>>>>>>>>>>>>> flink 1.9.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
> >>>>>>>>>>>>>> <boyu...@google.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Steve,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think the major performance regression comes from
> >>>>>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1],
> >>>>>>>>>>>>>>> which
> >>>>>>>>> will
> >>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use
> >>>>>>>>> timers/state
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> reschedule works.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> >>>>>>>>>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> >>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
> >>>>>>>>> sniem...@apache.org>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some
> >>>>>>>>> aggregation, and
> >>>>>>>>>>>>>>>> writes to various places.  Previously, in older
> >>>>>>>>>>>>>>>> versions of
> >>>>>>>>> beam,
> >>>>>>>>>>> when
> >>>>>>>>>>>>>>>> running this in the DirectRunner, messages would go
> >>>>>>>>>>>>>>>> through the
> >>>>>>>>>>> pipeline
> >>>>>>>>>>>>>>>> almost instantly, making it very easy to debug locally,
> >>>>>>>>>>>>>>>> etc.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that
> >>>>>>>>>>>>>>>> it could
> >>>>>>>>> take
> >>>>>>>>>>> on
> >>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the
> >>>>>>>>>>>>>>>> pubsub
> >>>>>>>>> read
> >>>>>>>>>>> step to
> >>>>>>>>>>>>>>>> the next step in the pipeline (deserializing them,
> >>>>>>>>>>>>>>>> etc).  The
> >>>>>>>>>>> subscription
> >>>>>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec
> >>>>>>>>> arriving
> >>>>>>>>>>> in it.
> >>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and
> >>>>>>>>>>>>>>>> makes
> >>>>>>>>> the
> >>>>>>>>>>>>>>>> pipeline behave as it did before.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It seems like the SDF implementation in the
> >>>>>>>>>>>>>>>> DirectRunner here
> >>>>>>>>> is
> >>>>>>>>>>>>>>>> causing some kind of issue, either buffering a very large
> >>>>>>>>> amount of
> >>>>>>>>>>> data
> >>>>>>>>>>>>>>>> before emitting it in a bundle, or something else.  Has
> >>>>>>>>>>>>>>>> anyone
> >>>>>>>>> else
> >>>>>>>>>>> run
> >>>>>>>>>>>>>>>> into this?
> >>>>>>>>>>>>>>>>
>

Reply via email to