> Primary purpose of DirectRunner is testing, not
performance
That's one argument, but it's very difficult to
effectively test a pipeline when I need to wait 15+
minutes for the first element to go through it. I also,
disagree in general that we shouldn't care about the
performance of the DirectRunner. It's likely the first
runner new users of beam try (I know it was for us), and
if it doesn't provide enough performance to actually run
a representative pipeline, users may extrapolate that
performance onto other runners (I know we did).
Anecdotally, the fact that the DirectRunner didn't work
for some of our initial test pipelines (because of
performance problems) probably delayed our adoption of
beam by at least 6 months.
> Steve, based on your findings, it seems like it takes
more time for the SDF pipeline to actually start to read
from PubSub and more time to output records.
Pubsub reads start ~instantly. but I'm not able to see
any elements actually output from it for a LONG time,
sometimes 30+ minutes. I see the reader acking back to
pubsub, so it IS committing, but no elements output.
After a bunch of debugging, I think I finally figured out
what the problem is though. During a checkpoint (in
trySplit), the UnboundedSourceViaSDF wrapper will close
the current source reader and create a new one. The
problem is, the pubsub reader needs some time to
correctly estimate it's watermark [1], and because it
gets closed and recreated so frequently due to
checkpointing (either number of elements, or duration),
it can never actually provide accurate estimates, and
always returns the min watermark. This seems like it
prevents some internal timers from ever firing,
effectively holding all elements in the pipeline state.
I can confirm this also by looking at WatermarkManager,
where I see all the bundles pending.
[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Ismaël,
what I meant by the performance vs. testing argument
is that when
choosing default values for certain (possibly
configurable) options, we
should prefer choices that result in better tested
code, not better
performance. DirectRunner actually does quite many
things that are
suboptimal performance-wise, but are good to be done
for test purposes
(immutability checks, as an example).
Regarding SDF in general, I can confirm we see some
issues with Flink,
most recently [1] (which I'm trying to fix right
now). That is actually
correctness, not performance issue. I personally
didn't notice any
performance issues, so far.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11481
On 12/17/20 3:24 PM, Ismaël Mejía wrote:
> The influence of checkpointing on the output of the
results should be
> minimal in particular for Direct Runner. It seems
what Steve reports
> here seems to be something different. Jan have you
or others already
> checked the influence of this on Flink who is now
using this new
> translation path?
>
> I think the argument that the Direct runner is
mostly about testing
> and not about performance is an argument that is
playing bad on Beam,
> one should not necessarily exclude the other.
Direct runner is our
> most used runner, basically every Beam user relies
on the direct
> runners so every regression or improvement on it
affects everyone, but
> well that's a subject worth its own thread.
>
> On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> from my point of view the number in DirectRunner
are set correctly. Primary purpose of DirectRunner is
testing, not performance, so DirectRunner makes
intentionally frequent checkpoints to easily exercise
potential bugs in user code. It might be possible to
make the frequency configurable, though.
>>
>> Jan
>>
>> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>
>> It's not a portable execution on DirectRunner so I
would expect that outputs from
OutputAndTimeBoundedSplittableProcessElementInvoker
should be emitted immediately. For SDF execution on
DirectRunner, the overhead could come from the SDF
expansion, SDF wrapper and the invoker.
>>
>> Steve, based on your findings, it seems like it
takes more time for the SDF pipeline to actually
start to read from PubSub and more time to output
records. Are you able to tell how much time each part
is taking?
>>
>> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
>>> If all it takes is bumping these numbers up a
bit, that seems like a reasonable thing to do ASAP.
(I would argue that perhaps they shouldn't be static,
e.g. it might be preferable to start emitting results
right away, but use larger batches for the steady
state if there are performance benefits.)
>>>
>>> That being said, it sounds like there's something
deeper going on here. We should also verify that this
performance impact is limited to the direct runner.
>>>
>>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz
<sniem...@apache.org <mailto:sniem...@apache.org>> wrote:
>>>> I tried changing my build locally to 10 seconds
and 10,000 elements but it didn't seem to make much
of a difference, it still takes a few minutes for
elements to begin actually showing up to downstream
stages from the Pubsub read. I can see elements
being emitted from
OutputAndTimeBoundedSplittableProcessElementInvoker,
and bundles being committed by
ParDoEvaluator.finishBundle, but after that, they
seem to just kind of disappear somewhere.
>>>>
>>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang
<boyu...@google.com <mailto:boyu...@google.com>> wrote:
>>>>> Making it as the PipelineOptions was my another
proposal but it might take some time to do so. On the
other hand, tuning the number into something
acceptable is low-hanging fruit.
>>>>>
>>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía
<ieme...@gmail.com <mailto:ieme...@gmail.com>> wrote:
>>>>>> It sounds reasonable. I am wondering also on
the consequence of these
>>>>>> parameters for other runners (where it is
every 10 seconds or 10000
>>>>>> elements) + their own configuration e.g.
checkpointInterval,
>>>>>> checkpointTimeoutMillis and
minPauseBetweenCheckpoints for Flink. It
>>>>>> is not clear for me what would be chosen now
in this case.
>>>>>>
>>>>>> I know we are a bit anti knobs but maybe it
makes sense to make this
>>>>>> configurable via PipelineOptions at least for
Direct runner.
>>>>>>
>>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang
<boyu...@google.com <mailto:boyu...@google.com>> wrote:
>>>>>>> I agree, Ismael.
>>>>>>>
>>>>>>> From my current investigation, the
performance overhead should majorly come from the
frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1],
which is hardcoded in the DirectRunner(every 1
seconds or 100 elements)[2]. I believe configuring
these numbers on DirectRunner should improve reported
cases so far. My last proposal was to change the
number to every 5 seconds or 10000 elements. What do
you think?
>>>>>>>
>>>>>>> [1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>> [2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>>>>>
>>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía
<ieme...@gmail.com <mailto:ieme...@gmail.com>> wrote:
>>>>>>>> I can guess that the same issues mentioned
here probably will affect
>>>>>>>> the usability for people trying Beam's
interactive SQL on Unbounded IO
>>>>>>>> too.
>>>>>>>>
>>>>>>>> We should really take into account that the
performance of the SDF
>>>>>>>> based path should be as good or better than
the previous version
>>>>>>>> before considering its removal
(--experiments=use_deprecated_read) and
>>>>>>>> probably have consensus when this happens.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>> From what I've seen, the direct runner
initiates a checkpoint after every element output.
>>>>>>>>> That seems like the 1 second limit kicks in
before the output reaches 100 elements.
>>>>>>>>>
>>>>>>>>> I think the original purpose for
DirectRunner to use a small limit on issuing
checkpoint requests is for exercising SDF better in a
small data set. But it brings overhead on a larger
set owing to too many checkpoints. It would be ideal
to make this limit configurable from pipeline but the
easiest approach is that we figure out a number for
most common cases. Do you think we raise the limit to
1000 elements or every 5 seconds will help?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve
Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
>>>>>>>>>> From what I've seen, the direct runner
initiates a checkpoint after every element output.
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>> Hi Antonio,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the details! Which version of
Beam SDK are you using? And are you using
--experiments=beam_fn_api with DirectRunner to launch
your pipeline?
>>>>>>>>>>>
>>>>>>>>>>> For ReadFromKafkaDoFn.processElement(),
it will take a Kafka topic+partition as input element
and a KafkaConsumer will be assigned to this
topic+partition then poll records continuously. The
Kafka consumer will resume reading and return from
the process fn when
>>>>>>>>>>>
>>>>>>>>>>> There are no available records
currently(this is a feature of SDF which calls SDF
self-initiated checkpoint)
>>>>>>>>>>> The
OutputAndTimeBoundedSplittableProcessElementInvoker
issues checkpoint request to ReadFromKafkaDoFn for
getting partial results. The checkpoint frequency for
DirectRunner is every 100 output records or every 1
seconds.
>>>>>>>>>>>
>>>>>>>>>>> It seems like either the self-initiated
checkpoint or DirectRunner issued checkpoint gives
you the performance regression since there is
overhead when rescheduling residuals. In your case,
it's more like that the checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to understand
what kind of performance regression you are noticing?
Is it slower to output the same amount of records?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio
Si <antonio...@gmail.com
<mailto:antonio...@gmail.com>> wrote:
>>>>>>>>>>>> Hi Boyuan,
>>>>>>>>>>>>
>>>>>>>>>>>> This is Antonio. I reported the
KafkaIO.read() performance issue on the slack channel
a few days ago.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if this is helpful, but I
have been doing some debugging on the SDK KafkaIO
performance issue for our pipeline and I would like
to provide some observations.
>>>>>>>>>>>>
>>>>>>>>>>>> It looks like in my case the
ReadFromKafkaDoFn.processElement() was invoked within
the same thread and every time kafaconsumer.poll() is
called, it returns some records, from 1 up to 200
records. So, it will proceed to run the pipeline
steps. Each kafkaconsumer.poll() takes about 0.8ms.
So, in this case, the polling and running of the
pipeline are executed sequentially within a single
thread. So, after processing a batch of records, it
will need to wait for 0.8ms before it can process the
next batch of records again.
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions would be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Antonio.
>>>>>>>>>>>>
>>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang
<boyu...@google.com <mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>> Opened
https://issues.apache.org/jira/browse/BEAM-11403 for
tracking.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll
check it out. The execution paths for
>>>>>>>>>>>>>> UnboundedSource and SDF wrapper are
different. It's highly possible that
>>>>>>>>>>>>>> the regression either comes from the
invocation path for SDF wrapper, or
>>>>>>>>>>>>>> the implementation of SDF wrapper itself.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve
Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Coincidentally, someone else in the
ASF slack mentioned [1] yesterday
>>>>>>>>>>>>>>> that they were seeing significantly
reduced performance using KafkaIO.Read
>>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded
source. They mentioned they were using
>>>>>>>>>>>>>>> flink 1.9.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Steve,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the major performance
regression comes from
>>>>>>>>>>>>>>>>
OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which will
>>>>>>>>>>>>>>>> checkpoint the DoFn based on
time/output limit and use timers/state to
>>>>>>>>>>>>>>>> reschedule works.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve
Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a pipeline that reads from
pubsub, does some aggregation, and
>>>>>>>>>>>>>>>>> writes to various places.
Previously, in older versions of beam, when
>>>>>>>>>>>>>>>>> running this in the DirectRunner,
messages would go through the pipeline
>>>>>>>>>>>>>>>>> almost instantly, making it very
easy to debug locally, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, after upgrading to beam
2.25, I noticed that it could take on
>>>>>>>>>>>>>>>>> the order of 5-10 minutes for
messages to get from the pubsub read step to
>>>>>>>>>>>>>>>>> the next step in the pipeline
(deserializing them, etc). The subscription
>>>>>>>>>>>>>>>>> being read from has on the order of
100,000 elements/sec arriving in it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Setting
--experiments=use_deprecated_read fixes it, and makes the
>>>>>>>>>>>>>>>>> pipeline behave as it did before.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems like the SDF
implementation in the DirectRunner here is
>>>>>>>>>>>>>>>>> causing some kind of issue, either
buffering a very large amount of data
>>>>>>>>>>>>>>>>> before emitting it in a bundle, or
something else. Has anyone else run
>>>>>>>>>>>>>>>>> into this?
>>>>>>>>>>>>>>>>>