Hi Steve,

We have one wrapper optimization[1] merged in and it will be released with
2.27.0. Would you like to verify whether it helps improve the
performance on DirectRunner?

[1] https://github.com/apache/beam/pull/13592

On Mon, Dec 28, 2020 at 12:17 PM Boyuan Zhang <boyu...@google.com> wrote:

> Hi Antonio,
>
> Thanks for the data! I want to elaborate more on where the overhead could
> come from when on Flink.
>
> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am able to
>> achieve 13K TPS
>
>
> This execution uses UnboundedSource path, where the checkpoint frequency
> for source reading is configured as the same frequency of flink checkpoint
> interval. In your case, the checkpoint frequency is every 60s.
> Flink reschedule the checkpoint marks to process by reading from states.
> Thue the overhead here could be the time for executing
> source.getCheckpointMark + reading from/writing to state + overhead of
> flink checkpoint execution.
>
>
>> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true,
>> I am able to achieve 10K
>>
>
> This execution uses Kafka SDF implementation, where the
> checkpoint frequency is configutred as every 10000 elements or every 10
> seconds by the OutputAndTimeBoundedSplittableProcessElementInvoker. As you
> mentioned that every poll takes 0.8s and returns 200 elements. So the
> checkpoint frequency here should be every 4s(hitting the 10000 limit).
> The residuals can be from runner-issued checkpoint or SDF self-checkpoint.
> Flink reshedules the residuals by using Timer and State.
> Thus the overhead here could be the time for scheduling timers + reading
> from/writing to states. I would expect to see improvements if we control
> the frequency longer than 60s(for example, every 60s or every 15000
> elements).
>
>
>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
>
>
> This execution uses UnboundedSourceAsSDFWrapperFn path, where the
> checkpoint frequency is also every 10000 elements or every 10
> seconds. Flink also reshedules the residuals by using Timer and State. So
> the overhead here could be the time for scheduling timers + reading
> from/writing to states +  overhead of the wrapper wrapping unbounded source.
>
>
> On Wed, Dec 23, 2020 at 12:30 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> OK,
>>
>> could you make an experiment and increase the parallelism to something
>> significantly higher than the total number of partitions? Say 5 times
>> higher? Would that have impact on throughput in your case?
>>
>> Jan
>>
>> On 12/23/20 7:03 PM, Antonio Si wrote:
>> > Hi Jan,
>> >
>> > The performance data that I reported was run with parallelism = 8. We
>> also ran with parallelism = 15 and we observed similar behaviors although I
>> don't have the exact numbers. I can get you the numbers if needed.
>> >
>> > Regarding number of partitions, since we have multiple topics, the
>> number of partitions varies from 180 to 12. The highest TPS topic has 180
>> partitions, while the lowest TPS topic has 12 partitions.
>> >
>> > Thanks.
>> >
>> > Antonio.
>> >
>> > On 2020/12/23 12:28:42, Jan Lukavský <je...@seznam.cz> wrote:
>> >> Hi Antonio,
>> >>
>> >> can you please clarify a few things:
>> >>
>> >>    a) what parallelism you use for your sources
>> >>
>> >>    b) how many partitions there is in your topic(s)
>> >>
>> >> Thanks,
>> >>
>> >>    Jan
>> >>
>> >> On 12/22/20 10:07 PM, Antonio Si wrote:
>> >>> Hi Boyuan,
>> >>>
>> >>> Let me clarify, I have tried with and without using
>> --experiments=beam_fn_api,use_sdf_kafka_read option:
>> >>>
>> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am
>> able to achieve 13K TPS
>> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
>> --fasterCopy=true, I am able to achieve 10K
>> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
>> >>>
>> >>> In our testcase, we have multiple topics, checkpoint intervals is
>> 60s. Some topics have a lot higher traffics than others. We look at the
>> case with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true
>> options a little. Based on our observation, each consumer poll() in
>> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with
>> high traffics, it will continue in the loop because every poll() will
>> return some records. Every poll returns about 200 records. So, it takes
>> about 0.8ms for every 200 records. I am not sure if that is part of the
>> reason for the performance.
>> >>>
>> >>> Thanks.
>> >>>
>> >>> Antonio.
>> >>>
>> >>> On 2020/12/21 19:03:19, Boyuan Zhang <boyu...@google.com> wrote:
>> >>>> Hi Antonio,
>> >>>>
>> >>>> Thanks for the data point. That's very valuable information!
>> >>>>
>> >>>> I didn't use DirectRunner. I am using FlinkRunner.
>> >>>>> We measured the number of Kafka messages that we can processed per
>> second.
>> >>>>> With Beam v2.26 with --experiments=use_deprecated_read and
>> >>>>> --fasterCopy=true,
>> >>>>> we are able to consume 13K messages per second, but with Beam v2.26
>> >>>>> without the use_deprecated_read option, we are only able to process
>> 10K
>> >>>>> messages
>> >>>>> per second for the same pipeline.
>> >>>> We do have SDF implementation of Kafka Read instead of using the
>> wrapper.
>> >>>> Would you like to have a try to see whether it helps you improve your
>> >>>> situation?  You can use --experiments=beam_fn_api,use_sdf_kafka_read
>> to
>> >>>> switch to the Kafka SDF Read.
>> >>>>
>> >>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >>>>
>> >>>>> Hi Jan,
>> >>>>>> it seems that what we would want is to couple the lifecycle of the
>> Reader
>> >>>>>> not with the restriction but with the particular instance of
>> >>>>>> (Un)boundedSource (after being split). That could be done in the
>> processing
>> >>>>>> DoFn, if it contained a cache mapping instance of the source to the
>> >>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we
>> could assign
>> >>>>>> (or create) the reader to the tracker, as the tracker is created
>> for each
>> >>>>>> restriction.
>> >>>>>>
>> >>>>>> WDYT?
>> >>>>>>
>> >>>>> I was thinking about this but it seems like it is not applicable to
>> the
>> >>>>> way how UnboundedSource and UnboundedReader work together.
>> >>>>> Please correct me if I'm wrong. The UnboundedReader is created from
>> >>>>> UnboundedSource per CheckpointMark[1], which means for certain
>> sources, the
>> >>>>> CheckpointMark could affect some attributes like start position of
>> the
>> >>>>> reader when resuming. So a single UnboundedSource could be mapped to
>> >>>>> multiple readers because of different instances of CheckpointMarl.
>> That's
>> >>>>> also the reason why we use CheckpointMark as the restriction.
>> >>>>>
>> >>>>> Please let me know if I misunderstand your suggestion.
>> >>>>>
>> >>>>> [1]
>> >>>>>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
>> >>>>>
>> >>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>>> Hi Boyuan,
>> >>>>>>
>> >>>>>> Sorry for my late reply. I was off for a few days.
>> >>>>>>
>> >>>>>> I didn't use DirectRunner. I am using FlinkRunner.
>> >>>>>>
>> >>>>>> We measured the number of Kafka messages that we can processed per
>> second.
>> >>>>>> With Beam v2.26 with --experiments=use_deprecated_read and
>> >>>>>> --fasterCopy=true,
>> >>>>>> we are able to consume 13K messages per second, but with Beam v2.26
>> >>>>>> without the use_deprecated_read option, we are only able to
>> process 10K
>> >>>>>> messages
>> >>>>>> per second for the same pipeline.
>> >>>>>>
>> >>>>>> Thanks and regards,
>> >>>>>>
>> >>>>>> Antonio.
>> >>>>>>
>> >>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote:
>> >>>>>>> Hi Antonio,
>> >>>>>>>
>> >>>>>>> Thanks for the details! Which version of Beam SDK are you using?
>> And are
>> >>>>>>> you using --experiments=beam_fn_api with DirectRunner to launch
>> your
>> >>>>>>> pipeline?
>> >>>>>>>
>> >>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>> >>>>>>> topic+partition as input element and a KafkaConsumer will be
>> assigned to
>> >>>>>>> this topic+partition then poll records continuously. The Kafka
>> consumer
>> >>>>>>> will resume reading and return from the process fn when
>> >>>>>>>
>> >>>>>>>      - There are no available records currently(this is a feature
>> of SDF
>> >>>>>>>      which calls SDF self-initiated checkpoint)
>> >>>>>>>      - The OutputAndTimeBoundedSplittableProcessElementInvoker
>> issues
>> >>>>>>>      checkpoint request to ReadFromKafkaDoFn for getting partial
>> results.
>> >>>>>> The
>> >>>>>>>      checkpoint frequency for DirectRunner is every 100 output
>> records or
>> >>>>>> every
>> >>>>>>>      1 seconds.
>> >>>>>>>
>> >>>>>>> It seems like either the self-initiated checkpoint or DirectRunner
>> >>>>>> issued
>> >>>>>>> checkpoint gives you the performance regression since there is
>> overhead
>> >>>>>>> when rescheduling residuals. In your case, it's more like that the
>> >>>>>>> checkpoint behavior of
>> >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker
>> >>>>>>> gives you 200 elements a batch. I want to understand what kind of
>> >>>>>>> performance regression you are noticing? Is it slower to output
>> the same
>> >>>>>>> amount of records?
>> >>>>>>>
>> >>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>>> Hi Boyuan,
>> >>>>>>>>
>> >>>>>>>> This is Antonio. I reported the KafkaIO.read() performance issue
>> on
>> >>>>>> the
>> >>>>>>>> slack channel a few days ago.
>> >>>>>>>>
>> >>>>>>>> I am not sure if this is helpful, but I have been doing some
>> >>>>>> debugging on
>> >>>>>>>> the SDK KafkaIO performance issue for our pipeline and I would
>> like to
>> >>>>>>>> provide some observations.
>> >>>>>>>>
>> >>>>>>>> It looks like in my case the ReadFromKafkaDoFn.processElement()
>> was
>> >>>>>>>> invoked within the same thread and every time
>> kafaconsumer.poll() is
>> >>>>>>>> called, it returns some records, from 1 up to 200 records. So,
>> it will
>> >>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll()
>> takes
>> >>>>>> about
>> >>>>>>>> 0.8ms. So, in this case, the polling and running of the pipeline
>> are
>> >>>>>>>> executed sequentially within a single thread. So, after
>> processing a
>> >>>>>> batch
>> >>>>>>>> of records, it will need to wait for 0.8ms before it can process
>> the
>> >>>>>> next
>> >>>>>>>> batch of records again.
>> >>>>>>>>
>> >>>>>>>> Any suggestions would be appreciated.
>> >>>>>>>>
>> >>>>>>>> Hope that helps.
>> >>>>>>>>
>> >>>>>>>> Thanks and regards,
>> >>>>>>>>
>> >>>>>>>> Antonio.
>> >>>>>>>>
>> >>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
>> >>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 for
>> >>>>>> tracking.
>> >>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <
>> boyu...@google.com>
>> >>>>>> wrote:
>> >>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The execution
>> >>>>>> paths
>> >>>>>>>> for
>> >>>>>>>>>> UnboundedSource and SDF wrapper are different. It's highly
>> >>>>>> possible
>> >>>>>>>> that
>> >>>>>>>>>> the regression either comes from the invocation path for SDF
>> >>>>>> wrapper,
>> >>>>>>>> or
>> >>>>>>>>>> the implementation of SDF wrapper itself.
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>> sniem...@apache.org
>> >>>>>>>> wrote:
>> >>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned [1]
>> >>>>>> yesterday
>> >>>>>>>>>>> that they were seeing significantly reduced performance using
>> >>>>>>>> KafkaIO.Read
>> >>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.  They mentioned
>> they
>> >>>>>> were
>> >>>>>>>> using
>> >>>>>>>>>>> flink 1.9.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <
>> boyu...@google.com>
>> >>>>>>>> wrote:
>> >>>>>>>>>>>> Hi Steve,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I think the major performance regression comes from
>> >>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which
>> >>>>>> will
>> >>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use
>> >>>>>> timers/state
>> >>>>>>>> to
>> >>>>>>>>>>>> reschedule works.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> [1]
>> >>>>>>>>>>>>
>> >>>>>>
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> >>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>> >>>>>> sniem...@apache.org>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some
>> >>>>>> aggregation, and
>> >>>>>>>>>>>>> writes to various places.  Previously, in older versions of
>> >>>>>> beam,
>> >>>>>>>> when
>> >>>>>>>>>>>>> running this in the DirectRunner, messages would go through
>> the
>> >>>>>>>> pipeline
>> >>>>>>>>>>>>> almost instantly, making it very easy to debug locally, etc.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that it
>> could
>> >>>>>> take
>> >>>>>>>> on
>> >>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the
>> pubsub
>> >>>>>> read
>> >>>>>>>> step to
>> >>>>>>>>>>>>> the next step in the pipeline (deserializing them, etc).
>> The
>> >>>>>>>> subscription
>> >>>>>>>>>>>>> being read from has on the order of 100,000 elements/sec
>> >>>>>> arriving
>> >>>>>>>> in it.
>> >>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, and
>> makes
>> >>>>>> the
>> >>>>>>>>>>>>> pipeline behave as it did before.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> It seems like the SDF implementation in the DirectRunner
>> here
>> >>>>>> is
>> >>>>>>>>>>>>> causing some kind of issue, either buffering a very large
>> >>>>>> amount of
>> >>>>>>>> data
>> >>>>>>>>>>>>> before emitting it in a bundle, or something else.  Has
>> anyone
>> >>>>>> else
>> >>>>>>>> run
>> >>>>>>>>>>>>> into this?
>> >>>>>>>>>>>>>
>>
>

Reply via email to