I agree, Ismael.

>From my current investigation, the performance overhead should majorly come
from the frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
configuring these numbers on DirectRunner should improve reported cases so
far. My last proposal was to change the number to every 5 seconds or 10000
elements. What do you think?

[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
[2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181

On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> I can guess that the same issues mentioned here probably will affect
> the usability for people trying Beam's interactive SQL on Unbounded IO
> too.
>
> We should really take into account that the performance of the SDF
> based path should be as good or better than the previous version
> before considering its removal (--experiments=use_deprecated_read) and
> probably have consensus when this happens.
>
>
> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> wrote:
> >
> > > From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> > That seems like the 1 second limit kicks in before the output reaches
> 100 elements.
> >
> > I think the original purpose for DirectRunner to use a small limit on
> issuing checkpoint requests is for exercising SDF better in a small data
> set. But it brings overhead on a larger set owing to too many checkpoints.
> It would be ideal to make this limit configurable from pipeline but the
> easiest approach is that we figure out a number for most common cases. Do
> you think we raise the limit to 1000 elements or every 5 seconds will help?
> >
> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org>
> wrote:
> >>
> >> From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> >>
> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>
> >>> Hi Antonio,
> >>>
> >>> Thanks for the details! Which version of Beam SDK are you using? And
> are you using --experiments=beam_fn_api with DirectRunner to launch your
> pipeline?
> >>>
> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> topic+partition as input element and a KafkaConsumer will be assigned to
> this topic+partition then poll records continuously. The Kafka consumer
> will resume reading and return from the process fn when
> >>>
> >>> There are no available records currently(this is a feature of SDF
> which calls SDF self-initiated checkpoint)
> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues
> checkpoint request to ReadFromKafkaDoFn for getting partial results. The
> checkpoint frequency for DirectRunner is every 100 output records or every
> 1 seconds.
> >>>
> >>> It seems like either the self-initiated checkpoint or DirectRunner
> issued checkpoint gives you the performance regression since there is
> overhead when rescheduling residuals. In your case, it's more like that the
> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
> gives you 200 elements a batch. I want to understand what kind of
> performance regression you are noticing? Is it slower to output the same
> amount of records?
> >>>
> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com>
> wrote:
> >>>>
> >>>> Hi Boyuan,
> >>>>
> >>>> This is Antonio. I reported the KafkaIO.read() performance issue on
> the slack channel a few days ago.
> >>>>
> >>>> I am not sure if this is helpful, but I have been doing some
> debugging on the SDK KafkaIO performance issue for our pipeline and I would
> like to provide some observations.
> >>>>
> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
> invoked within the same thread and every time kafaconsumer.poll() is
> called, it returns some records, from 1 up to 200 records. So, it will
> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
> 0.8ms. So, in this case, the polling and running of the pipeline are
> executed sequentially within a single thread. So, after processing a batch
> of records, it will need to wait for 0.8ms before it can process the next
> batch of records again.
> >>>>
> >>>> Any suggestions would be appreciated.
> >>>>
> >>>> Hope that helps.
> >>>>
> >>>> Thanks and regards,
> >>>>
> >>>> Antonio.
> >>>>
> >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
> >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for
> tracking.
> >>>> >
> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>> >
> >>>> > > Thanks for the pointer, Steve! I'll check it out. The execution
> paths for
> >>>> > > UnboundedSource and SDF wrapper are different. It's highly
> possible that
> >>>> > > the regression either comes from the invocation path for SDF
> wrapper, or
> >>>> > > the implementation of SDF wrapper itself.
> >>>> > >
> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org>
> wrote:
> >>>> > >
> >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1]
> yesterday
> >>>> > >> that they were seeing significantly reduced performance using
> KafkaIO.Read
> >>>> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they
> were using
> >>>> > >> flink 1.9.
> >>>> > >>
> >>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> >>>> > >>
> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com>
> wrote:
> >>>> > >>
> >>>> > >>> Hi Steve,
> >>>> > >>>
> >>>> > >>> I think the major performance regression comes from
> >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which
> will
> >>>> > >>> checkpoint the DoFn based on time/output limit and use
> timers/state to
> >>>> > >>> reschedule works.
> >>>> > >>>
> >>>> > >>> [1]
> >>>> > >>>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> >>>> > >>>
> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
> sniem...@apache.org>
> >>>> > >>> wrote:
> >>>> > >>>
> >>>> > >>>> I have a pipeline that reads from pubsub, does some
> aggregation, and
> >>>> > >>>> writes to various places.  Previously, in older versions of
> beam, when
> >>>> > >>>> running this in the DirectRunner, messages would go through
> the pipeline
> >>>> > >>>> almost instantly, making it very easy to debug locally, etc.
> >>>> > >>>>
> >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it could
> take on
> >>>> > >>>> the order of 5-10 minutes for messages to get from the pubsub
> read step to
> >>>> > >>>> the next step in the pipeline (deserializing them, etc).  The
> subscription
> >>>> > >>>> being read from has on the order of 100,000 elements/sec
> arriving in it.
> >>>> > >>>>
> >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes
> the
> >>>> > >>>> pipeline behave as it did before.
> >>>> > >>>>
> >>>> > >>>> It seems like the SDF implementation in the DirectRunner here
> is
> >>>> > >>>> causing some kind of issue, either buffering a very large
> amount of data
> >>>> > >>>> before emitting it in a bundle, or something else.  Has anyone
> else run
> >>>> > >>>> into this?
> >>>> > >>>>
> >>>> > >>>
> >>>> >
>

Reply via email to