Hi Ismaël,

what I meant by the performance vs. testing argument is that when choosing default values for certain (possibly configurable) options, we should prefer choices that result in better tested code, not better performance. DirectRunner actually does quite many things that are suboptimal performance-wise, but are good to be done for test purposes (immutability checks, as an example).

Regarding SDF in general, I can confirm we see some issues with Flink, most recently [1] (which I'm trying to fix right now). That is actually correctness, not performance issue. I personally didn't notice any performance issues, so far.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-11481

On 12/17/20 3:24 PM, Ismaël Mejía wrote:
The influence of checkpointing on the output of the results should be
minimal in particular for Direct Runner. It seems what Steve reports
here seems to be something different. Jan have you or others already
checked the influence of this on Flink who is now using this new
translation path?

I think the argument that the Direct runner is mostly about testing
and not about performance is an argument that is playing bad on Beam,
one should not necessarily exclude the other. Direct runner is our
most used runner, basically every Beam user relies on the direct
runners so every regression or improvement on it affects everyone, but
well that's a subject worth its own thread.

On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

from my point of view the number in DirectRunner are set correctly. Primary 
purpose of DirectRunner is testing, not performance, so DirectRunner makes 
intentionally frequent checkpoints to easily exercise potential bugs in user 
code. It might be possible to make the frequency configurable, though.

Jan

On 12/17/20 12:20 AM, Boyuan Zhang wrote:

It's not a portable execution on DirectRunner so I would expect that outputs 
from OutputAndTimeBoundedSplittableProcessElementInvoker should be emitted 
immediately. For SDF execution on DirectRunner, the overhead could come from 
the SDF expansion, SDF wrapper and the invoker.

Steve, based on your findings, it seems like it takes more time for the SDF 
pipeline to actually start to read from PubSub and more time to output records. 
Are you able to tell how much time each part is taking?

On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <rober...@google.com> wrote:
If all it takes is bumping these numbers up a bit, that seems like a reasonable 
thing to do ASAP. (I would argue that perhaps they shouldn't be static, e.g. it 
might be preferable to start emitting results right away, but use larger 
batches for the steady state if there are performance benefits.)

That being said, it sounds like there's something deeper going on here. We 
should also verify that this performance impact is limited to the direct runner.

On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <sniem...@apache.org> wrote:
I tried changing my build locally to 10 seconds and 10,000 elements but it 
didn't seem to make much of a difference, it still takes a few minutes for 
elements to begin actually showing up to downstream stages from the Pubsub 
read.  I can see elements being emitted from 
OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being 
committed by ParDoEvaluator.finishBundle, but after that, they seem to just 
kind of disappear somewhere.

On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <boyu...@google.com> wrote:
Making it as the PipelineOptions was my another proposal but it might take some 
time to do so. On the other hand, tuning the number into something acceptable 
is low-hanging fruit.

On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <ieme...@gmail.com> wrote:
It sounds reasonable. I am wondering also on the consequence of these
parameters for other runners (where it is every 10 seconds or 10000
elements) + their own configuration e.g. checkpointInterval,
checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It
is not clear for me what would be chosen now in this case.

I know we are a bit anti knobs but maybe it makes sense to make this
configurable via PipelineOptions at least for Direct runner.

On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <boyu...@google.com> wrote:
I agree, Ismael.

 From my current investigation, the performance overhead should majorly come 
from the frequency of checkpoint in 
OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded in 
the DirectRunner(every 1 seconds or 100 elements)[2]. I believe configuring 
these numbers on DirectRunner should improve reported cases so far. My last 
proposal was to change the number to every 5 seconds or 10000 elements. What do 
you think?

[1] 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
[2] 
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181

On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote:
I can guess that the same issues mentioned here probably will affect
the usability for people trying Beam's interactive SQL on Unbounded IO
too.

We should really take into account that the performance of the SDF
based path should be as good or better than the previous version
before considering its removal (--experiments=use_deprecated_read) and
probably have consensus when this happens.


On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> wrote:
 From what I've seen, the direct runner initiates a checkpoint after every 
element output.
That seems like the 1 second limit kicks in before the output reaches 100 
elements.

I think the original purpose for DirectRunner to use a small limit on issuing 
checkpoint requests is for exercising SDF better in a small data set. But it 
brings overhead on a larger set owing to too many checkpoints. It would be 
ideal to make this limit configurable from pipeline but the easiest approach is 
that we figure out a number for most common cases. Do you think we raise the 
limit to 1000 elements or every 5 seconds will help?

On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> wrote:
 From what I've seen, the direct runner initiates a checkpoint after every 
element output.

On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> wrote:
Hi Antonio,

Thanks for the details! Which version of Beam SDK are you using? And are you 
using --experiments=beam_fn_api with DirectRunner to launch your pipeline?

For ReadFromKafkaDoFn.processElement(), it will take a Kafka topic+partition as 
input element and a KafkaConsumer will be assigned to this topic+partition then 
poll records continuously. The Kafka consumer will resume reading and return 
from the process fn when

There are no available records currently(this is a feature of SDF which calls 
SDF self-initiated checkpoint)
The OutputAndTimeBoundedSplittableProcessElementInvoker issues checkpoint 
request to ReadFromKafkaDoFn for getting partial results. The checkpoint 
frequency for DirectRunner is every 100 output records or every 1 seconds.

It seems like either the self-initiated checkpoint or DirectRunner issued 
checkpoint gives you the performance regression since there is overhead when 
rescheduling residuals. In your case, it's more like that the checkpoint 
behavior of OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200 
elements a batch. I want to understand what kind of performance regression you 
are noticing? Is it slower to output the same amount of records?

On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote:
Hi Boyuan,

This is Antonio. I reported the KafkaIO.read() performance issue on the slack 
channel a few days ago.

I am not sure if this is helpful, but I have been doing some debugging on the 
SDK KafkaIO performance issue for our pipeline and I would like to provide some 
observations.

It looks like in my case the ReadFromKafkaDoFn.processElement()  was invoked 
within the same thread and every time kafaconsumer.poll() is called, it returns 
some records, from 1 up to 200 records. So, it will proceed to run the pipeline 
steps. Each kafkaconsumer.poll() takes about 0.8ms. So, in this case, the 
polling and running of the pipeline are executed sequentially within a single 
thread. So, after processing a batch of records, it will need to wait for 0.8ms 
before it can process the next batch of records again.

Any suggestions would be appreciated.

Hope that helps.

Thanks and regards,

Antonio.

On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.

On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> wrote:

Thanks for the pointer, Steve! I'll check it out. The execution paths for
UnboundedSource and SDF wrapper are different. It's highly possible that
the regression either comes from the invocation path for SDF wrapper, or
the implementation of SDF wrapper itself.

On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> wrote:

Coincidentally, someone else in the ASF slack mentioned [1] yesterday
that they were seeing significantly reduced performance using KafkaIO.Read
w/ the SDF wrapper vs the unbounded source.  They mentioned they were using
flink 1.9.

https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900

On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> wrote:

Hi Steve,

I think the major performance regression comes from
OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
checkpoint the DoFn based on time/output limit and use timers/state to
reschedule works.

[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org>
wrote:

I have a pipeline that reads from pubsub, does some aggregation, and
writes to various places.  Previously, in older versions of beam, when
running this in the DirectRunner, messages would go through the pipeline
almost instantly, making it very easy to debug locally, etc.

However, after upgrading to beam 2.25, I noticed that it could take on
the order of 5-10 minutes for messages to get from the pubsub read step to
the next step in the pipeline (deserializing them, etc).  The subscription
being read from has on the order of 100,000 elements/sec arriving in it.

Setting --experiments=use_deprecated_read fixes it, and makes the
pipeline behave as it did before.

It seems like the SDF implementation in the DirectRunner here is
causing some kind of issue, either buffering a very large amount of data
before emitting it in a bundle, or something else.  Has anyone else run
into this?

Reply via email to