I tried changing my build locally to 10 seconds and 10,000 elements but it
didn't seem to make much of a difference, it still takes a few minutes for
elements to begin actually showing up to downstream stages from the Pubsub
read.  I can see elements being emitted
from OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being
committed by ParDoEvaluator.finishBundle, but after that, they seem to just
kind of disappear somewhere.

On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <boyu...@google.com> wrote:

> Making it as the PipelineOptions was my another proposal but it might take
> some time to do so. On the other hand, tuning the number into
> something acceptable is low-hanging fruit.
>
> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <ieme...@gmail.com> wrote:
>
>> It sounds reasonable. I am wondering also on the consequence of these
>> parameters for other runners (where it is every 10 seconds or 10000
>> elements) + their own configuration e.g. checkpointInterval,
>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It
>> is not clear for me what would be chosen now in this case.
>>
>> I know we are a bit anti knobs but maybe it makes sense to make this
>> configurable via PipelineOptions at least for Direct runner.
>>
>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <boyu...@google.com> wrote:
>> >
>> > I agree, Ismael.
>> >
>> > From my current investigation, the performance overhead should majorly
>> come from the frequency of checkpoint in
>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
>> configuring these numbers on DirectRunner should improve reported cases so
>> far. My last proposal was to change the number to every 5 seconds or 10000
>> elements. What do you think?
>> >
>> > [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> > [2]
>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>> >
>> > On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>> >>
>> >> I can guess that the same issues mentioned here probably will affect
>> >> the usability for people trying Beam's interactive SQL on Unbounded IO
>> >> too.
>> >>
>> >> We should really take into account that the performance of the SDF
>> >> based path should be as good or better than the previous version
>> >> before considering its removal (--experiments=use_deprecated_read) and
>> >> probably have consensus when this happens.
>> >>
>> >>
>> >> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >> >
>> >> > > From what I've seen, the direct runner initiates a checkpoint
>> after every element output.
>> >> > That seems like the 1 second limit kicks in before the output
>> reaches 100 elements.
>> >> >
>> >> > I think the original purpose for DirectRunner to use a small limit
>> on issuing checkpoint requests is for exercising SDF better in a small data
>> set. But it brings overhead on a larger set owing to too many checkpoints.
>> It would be ideal to make this limit configurable from pipeline but the
>> easiest approach is that we figure out a number for most common cases. Do
>> you think we raise the limit to 1000 elements or every 5 seconds will help?
>> >> >
>> >> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org>
>> wrote:
>> >> >>
>> >> >> From what I've seen, the direct runner initiates a checkpoint after
>> every element output.
>> >> >>
>> >> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com>
>> wrote:
>> >> >>>
>> >> >>> Hi Antonio,
>> >> >>>
>> >> >>> Thanks for the details! Which version of Beam SDK are you using?
>> And are you using --experiments=beam_fn_api with DirectRunner to launch
>> your pipeline?
>> >> >>>
>> >> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>> topic+partition as input element and a KafkaConsumer will be assigned to
>> this topic+partition then poll records continuously. The Kafka consumer
>> will resume reading and return from the process fn when
>> >> >>>
>> >> >>> There are no available records currently(this is a feature of SDF
>> which calls SDF self-initiated checkpoint)
>> >> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>> checkpoint request to ReadFromKafkaDoFn for getting partial results. The
>> checkpoint frequency for DirectRunner is every 100 output records or every
>> 1 seconds.
>> >> >>>
>> >> >>> It seems like either the self-initiated checkpoint or DirectRunner
>> issued checkpoint gives you the performance regression since there is
>> overhead when rescheduling residuals. In your case, it's more like that the
>> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
>> gives you 200 elements a batch. I want to understand what kind of
>> performance regression you are noticing? Is it slower to output the same
>> amount of records?
>> >> >>>
>> >> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com>
>> wrote:
>> >> >>>>
>> >> >>>> Hi Boyuan,
>> >> >>>>
>> >> >>>> This is Antonio. I reported the KafkaIO.read() performance issue
>> on the slack channel a few days ago.
>> >> >>>>
>> >> >>>> I am not sure if this is helpful, but I have been doing some
>> debugging on the SDK KafkaIO performance issue for our pipeline and I would
>> like to provide some observations.
>> >> >>>>
>> >> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement()
>> was invoked within the same thread and every time kafaconsumer.poll() is
>> called, it returns some records, from 1 up to 200 records. So, it will
>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
>> 0.8ms. So, in this case, the polling and running of the pipeline are
>> executed sequentially within a single thread. So, after processing a batch
>> of records, it will need to wait for 0.8ms before it can process the next
>> batch of records again.
>> >> >>>>
>> >> >>>> Any suggestions would be appreciated.
>> >> >>>>
>> >> >>>> Hope that helps.
>> >> >>>>
>> >> >>>> Thanks and regards,
>> >> >>>>
>> >> >>>> Antonio.
>> >> >>>>
>> >> >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
>> >> >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for
>> tracking.
>> >> >>>> >
>> >> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <
>> boyu...@google.com> wrote:
>> >> >>>> >
>> >> >>>> > > Thanks for the pointer, Steve! I'll check it out. The
>> execution paths for
>> >> >>>> > > UnboundedSource and SDF wrapper are different. It's highly
>> possible that
>> >> >>>> > > the regression either comes from the invocation path for SDF
>> wrapper, or
>> >> >>>> > > the implementation of SDF wrapper itself.
>> >> >>>> > >
>> >> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>> sniem...@apache.org> wrote:
>> >> >>>> > >
>> >> >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1]
>> yesterday
>> >> >>>> > >> that they were seeing significantly reduced performance
>> using KafkaIO.Read
>> >> >>>> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned
>> they were using
>> >> >>>> > >> flink 1.9.
>> >> >>>> > >>
>> >> >>>> > >>
>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>> >> >>>> > >>
>> >> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <
>> boyu...@google.com> wrote:
>> >> >>>> > >>
>> >> >>>> > >>> Hi Steve,
>> >> >>>> > >>>
>> >> >>>> > >>> I think the major performance regression comes from
>> >> >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1],
>> which will
>> >> >>>> > >>> checkpoint the DoFn based on time/output limit and use
>> timers/state to
>> >> >>>> > >>> reschedule works.
>> >> >>>> > >>>
>> >> >>>> > >>> [1]
>> >> >>>> > >>>
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> >> >>>> > >>>
>> >> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>> sniem...@apache.org>
>> >> >>>> > >>> wrote:
>> >> >>>> > >>>
>> >> >>>> > >>>> I have a pipeline that reads from pubsub, does some
>> aggregation, and
>> >> >>>> > >>>> writes to various places.  Previously, in older versions
>> of beam, when
>> >> >>>> > >>>> running this in the DirectRunner, messages would go
>> through the pipeline
>> >> >>>> > >>>> almost instantly, making it very easy to debug locally,
>> etc.
>> >> >>>> > >>>>
>> >> >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it
>> could take on
>> >> >>>> > >>>> the order of 5-10 minutes for messages to get from the
>> pubsub read step to
>> >> >>>> > >>>> the next step in the pipeline (deserializing them, etc).
>> The subscription
>> >> >>>> > >>>> being read from has on the order of 100,000 elements/sec
>> arriving in it.
>> >> >>>> > >>>>
>> >> >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and
>> makes the
>> >> >>>> > >>>> pipeline behave as it did before.
>> >> >>>> > >>>>
>> >> >>>> > >>>> It seems like the SDF implementation in the DirectRunner
>> here is
>> >> >>>> > >>>> causing some kind of issue, either buffering a very large
>> amount of data
>> >> >>>> > >>>> before emitting it in a bundle, or something else.  Has
>> anyone else run
>> >> >>>> > >>>> into this?
>> >> >>>> > >>>>
>> >> >>>> > >>>
>> >> >>>> >
>>
>

Reply via email to