> Primary purpose of DirectRunner is testing, not
performance
That's one argument, but it's very difficult to
effectively test a pipeline when I need to wait 15+
minutes for the first element to go through it. I
also, disagree in general that we shouldn't care
about the performance of the DirectRunner. It's
likely the first runner new users of beam try (I
know it was for us), and if it doesn't provide
enough performance to actually run a
representative pipeline, users may extrapolate that
performance onto other runners (I know we did).
Anecdotally, the fact that the DirectRunner didn't
work for some of our initial test pipelines (because
of performance problems) probably delayed our
adoption of beam by at least 6 months.
> Steve, based on your findings, it seems like it
takes more time for the SDF pipeline to actually
start to read from PubSub and more time to output
records.
Pubsub reads start ~instantly. but I'm not able to
see any elements actually output from it for a LONG
time, sometimes 30+ minutes. I see the reader
acking back to pubsub, so it IS committing, but no
elements output.
After a bunch of debugging, I think I finally
figured out what the problem is though. During a
checkpoint (in trySplit), the UnboundedSourceViaSDF
wrapper will close the current source reader and
create a new one. The problem is, the pubsub reader
needs some time to correctly estimate it's watermark
[1], and because it gets closed and recreated so
frequently due to checkpointing (either number of
elements, or duration), it can never actually
provide accurate estimates, and always returns the
min watermark. This seems like it prevents some
internal timers from ever firing, effectively
holding all elements in the pipeline state. I can
confirm this also by looking at WatermarkManager,
where I see all the bundles pending.
[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Ismaël,
what I meant by the performance vs. testing
argument is that when
choosing default values for certain (possibly
configurable) options, we
should prefer choices that result in better
tested code, not better
performance. DirectRunner actually does quite
many things that are
suboptimal performance-wise, but are good to be
done for test purposes
(immutability checks, as an example).
Regarding SDF in general, I can confirm we see
some issues with Flink,
most recently [1] (which I'm trying to fix right
now). That is actually
correctness, not performance issue. I personally
didn't notice any
performance issues, so far.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11481
On 12/17/20 3:24 PM, Ismaël Mejía wrote:
> The influence of checkpointing on the output
of the results should be
> minimal in particular for Direct Runner. It
seems what Steve reports
> here seems to be something different. Jan have
you or others already
> checked the influence of this on Flink who is
now using this new
> translation path?
>
> I think the argument that the Direct runner is
mostly about testing
> and not about performance is an argument that
is playing bad on Beam,
> one should not necessarily exclude the other.
Direct runner is our
> most used runner, basically every Beam user
relies on the direct
> runners so every regression or improvement on
it affects everyone, but
> well that's a subject worth its own thread.
>
> On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>> Hi,
>>
>> from my point of view the number in
DirectRunner are set correctly. Primary purpose
of DirectRunner is testing, not performance, so
DirectRunner makes intentionally frequent
checkpoints to easily exercise potential bugs in
user code. It might be possible to make the
frequency configurable, though.
>>
>> Jan
>>
>> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>
>> It's not a portable execution on DirectRunner
so I would expect that outputs from
OutputAndTimeBoundedSplittableProcessElementInvoker
should be emitted immediately. For SDF execution
on DirectRunner, the overhead could come from
the SDF expansion, SDF wrapper and the invoker.
>>
>> Steve, based on your findings, it seems like
it takes more time for the SDF pipeline to
actually start to read from PubSub and more time
to output records. Are you able to tell how much
time each part is taking?
>>
>> On Wed, Dec 16, 2020 at 1:53 PM Robert
Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
>>> If all it takes is bumping these numbers up
a bit, that seems like a reasonable thing to do
ASAP. (I would argue that perhaps they shouldn't
be static, e.g. it might be preferable to start
emitting results right away, but use larger
batches for the steady state if there are
performance benefits.)
>>>
>>> That being said, it sounds like there's
something deeper going on here. We should also
verify that this performance impact is limited
to the direct runner.
>>>
>>> On Wed, Dec 16, 2020 at 1:36 PM Steve
Niemitz <[email protected]
<mailto:[email protected]>> wrote:
>>>> I tried changing my build locally to 10
seconds and 10,000 elements but it didn't seem
to make much of a difference, it still takes a
few minutes for elements to begin actually
showing up to downstream stages from the Pubsub
read. I can see elements being emitted from
OutputAndTimeBoundedSplittableProcessElementInvoker,
and bundles being committed by
ParDoEvaluator.finishBundle, but after that,
they seem to just kind of disappear somewhere.
>>>>
>>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan
Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>> Making it as the PipelineOptions was my
another proposal but it might take some time to
do so. On the other hand, tuning the number into
something acceptable is low-hanging fruit.
>>>>>
>>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël
Mejía <[email protected]
<mailto:[email protected]>> wrote:
>>>>>> It sounds reasonable. I am wondering also
on the consequence of these
>>>>>> parameters for other runners (where it is
every 10 seconds or 10000
>>>>>> elements) + their own configuration e.g.
checkpointInterval,
>>>>>> checkpointTimeoutMillis and
minPauseBetweenCheckpoints for Flink. It
>>>>>> is not clear for me what would be chosen
now in this case.
>>>>>>
>>>>>> I know we are a bit anti knobs but maybe
it makes sense to make this
>>>>>> configurable via PipelineOptions at least
for Direct runner.
>>>>>>
>>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan
Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>> I agree, Ismael.
>>>>>>>
>>>>>>> From my current investigation, the
performance overhead should majorly come from
the frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1],
which is hardcoded in the DirectRunner(every 1
seconds or 100 elements)[2]. I believe
configuring these numbers on DirectRunner should
improve reported cases so far. My last proposal
was to change the number to every 5 seconds or
10000 elements. What do you think?
>>>>>>>
>>>>>>> [1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>> [2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>>>>>
>>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël
Mejía <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>> I can guess that the same issues
mentioned here probably will affect
>>>>>>>> the usability for people trying Beam's
interactive SQL on Unbounded IO
>>>>>>>> too.
>>>>>>>>
>>>>>>>> We should really take into account that
the performance of the SDF
>>>>>>>> based path should be as good or better
than the previous version
>>>>>>>> before considering its removal
(--experiments=use_deprecated_read) and
>>>>>>>> probably have consensus when this happens.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan
Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>> From what I've seen, the direct
runner initiates a checkpoint after every
element output.
>>>>>>>>> That seems like the 1 second limit
kicks in before the output reaches 100 elements.
>>>>>>>>>
>>>>>>>>> I think the original purpose for
DirectRunner to use a small limit on issuing
checkpoint requests is for exercising SDF better
in a small data set. But it brings overhead on a
larger set owing to too many checkpoints. It
would be ideal to make this limit configurable
from pipeline but the easiest approach is that
we figure out a number for most common cases. Do
you think we raise the limit to 1000 elements or
every 5 seconds will help?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve
Niemitz <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>> From what I've seen, the direct
runner initiates a checkpoint after every
element output.
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM
Boyuan Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>> Hi Antonio,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the details! Which
version of Beam SDK are you using? And are you
using --experiments=beam_fn_api with
DirectRunner to launch your pipeline?
>>>>>>>>>>>
>>>>>>>>>>> For
ReadFromKafkaDoFn.processElement(), it will take
a Kafka topic+partition as input element and a
KafkaConsumer will be assigned to this
topic+partition then poll records continuously.
The Kafka consumer will resume reading and
return from the process fn when
>>>>>>>>>>>
>>>>>>>>>>> There are no available records
currently(this is a feature of SDF which calls
SDF self-initiated checkpoint)
>>>>>>>>>>> The
OutputAndTimeBoundedSplittableProcessElementInvoker
issues checkpoint request to ReadFromKafkaDoFn
for getting partial results. The checkpoint
frequency for DirectRunner is every 100 output
records or every 1 seconds.
>>>>>>>>>>>
>>>>>>>>>>> It seems like either the
self-initiated checkpoint or DirectRunner issued
checkpoint gives you the performance regression
since there is overhead when rescheduling
residuals. In your case, it's more like that the
checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to
understand what kind of performance regression
you are noticing? Is it slower to output the
same amount of records?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM
Antonio Si <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>>> Hi Boyuan,
>>>>>>>>>>>>
>>>>>>>>>>>> This is Antonio. I reported the
KafkaIO.read() performance issue on the slack
channel a few days ago.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if this is helpful,
but I have been doing some debugging on the SDK
KafkaIO performance issue for our pipeline and I
would like to provide some observations.
>>>>>>>>>>>>
>>>>>>>>>>>> It looks like in my case the
ReadFromKafkaDoFn.processElement() was invoked
within the same thread and every time
kafaconsumer.poll() is called, it returns some
records, from 1 up to 200 records. So, it will
proceed to run the pipeline steps. Each
kafkaconsumer.poll() takes about 0.8ms. So, in
this case, the polling and running of the
pipeline are executed sequentially within a
single thread. So, after processing a batch of
records, it will need to wait for 0.8ms before
it can process the next batch of records again.
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions would be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Antonio.
>>>>>>>>>>>>
>>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan
Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>>>> Opened
https://issues.apache.org/jira/browse/BEAM-11403
for tracking.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM
Boyuan Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the pointer, Steve!
I'll check it out. The execution paths for
>>>>>>>>>>>>>> UnboundedSource and SDF wrapper
are different. It's highly possible that
>>>>>>>>>>>>>> the regression either comes from
the invocation path for SDF wrapper, or
>>>>>>>>>>>>>> the implementation of SDF wrapper
itself.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM
Steve Niemitz <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Coincidentally, someone else in
the ASF slack mentioned [1] yesterday
>>>>>>>>>>>>>>> that they were seeing
significantly reduced performance using KafkaIO.Read
>>>>>>>>>>>>>>> w/ the SDF wrapper vs the
unbounded source. They mentioned they were using
>>>>>>>>>>>>>>> flink 1.9.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM
Boyuan Zhang <[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Steve,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the major performance
regression comes from
>>>>>>>>>>>>>>>>
OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which will
>>>>>>>>>>>>>>>> checkpoint the DoFn based on
time/output limit and use timers/state to
>>>>>>>>>>>>>>>> reschedule works.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM
Steve Niemitz <[email protected]
<mailto:[email protected]>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a pipeline that reads
from pubsub, does some aggregation, and
>>>>>>>>>>>>>>>>> writes to various places.
Previously, in older versions of beam, when
>>>>>>>>>>>>>>>>> running this in the
DirectRunner, messages would go through the pipeline
>>>>>>>>>>>>>>>>> almost instantly, making it
very easy to debug locally, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, after upgrading to
beam 2.25, I noticed that it could take on
>>>>>>>>>>>>>>>>> the order of 5-10 minutes for
messages to get from the pubsub read step to
>>>>>>>>>>>>>>>>> the next step in the pipeline
(deserializing them, etc). The subscription
>>>>>>>>>>>>>>>>> being read from has on the
order of 100,000 elements/sec arriving in it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Setting
--experiments=use_deprecated_read fixes it, and
makes the
>>>>>>>>>>>>>>>>> pipeline behave as it did before.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems like the SDF
implementation in the DirectRunner here is
>>>>>>>>>>>>>>>>> causing some kind of issue,
either buffering a very large amount of data
>>>>>>>>>>>>>>>>> before emitting it in a
bundle, or something else. Has anyone else run
>>>>>>>>>>>>>>>>> into this?
>>>>>>>>>>>>>>>>>