> Primary purpose of DirectRunner is testing,
not performance
That's one argument, but it's very difficult to
effectively test a pipeline when I need to wait
15+ minutes for the first element to go through
it. I also, disagree in general that we
shouldn't care about the performance of the
DirectRunner. It's likely the first runner new
users of beam try (I know it was for us), and
if it doesn't provide enough performance to
actually run a representative pipeline, users
may extrapolate that performance onto other
runners (I know we did). Anecdotally, the fact
that the DirectRunner didn't work for some of
our initial test pipelines (because of
performance problems) probably delayed our
adoption of beam by at least 6 months.
> Steve, based on your findings, it seems like
it takes more time for the SDF pipeline to
actually start to read from PubSub and more
time to output records.
Pubsub reads start ~instantly. but I'm not able
to see any elements actually output from it for
a LONG time, sometimes 30+ minutes. I see the
reader acking back to pubsub, so it IS
committing, but no elements output.
After a bunch of debugging, I think I finally
figured out what the problem is though. During
a checkpoint (in trySplit), the
UnboundedSourceViaSDF wrapper will close the
current source reader and create a new one.
The problem is, the pubsub reader needs some
time to correctly estimate it's watermark [1],
and because it gets closed and recreated so
frequently due to checkpointing (either number
of elements, or duration), it can never
actually provide accurate estimates, and always
returns the min watermark. This seems like it
prevents some internal timers from ever firing,
effectively holding all elements in the
pipeline state. I can confirm this also by
looking at WatermarkManager, where I see all
the bundles pending.
[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Ismaël,
what I meant by the performance vs. testing
argument is that when
choosing default values for certain
(possibly configurable) options, we
should prefer choices that result in better
tested code, not better
performance. DirectRunner actually does
quite many things that are
suboptimal performance-wise, but are good
to be done for test purposes
(immutability checks, as an example).
Regarding SDF in general, I can confirm we
see some issues with Flink,
most recently [1] (which I'm trying to fix
right now). That is actually
correctness, not performance issue. I
personally didn't notice any
performance issues, so far.
Jan
[1]
https://issues.apache.org/jira/browse/BEAM-11481
On 12/17/20 3:24 PM, Ismaël Mejía wrote:
> The influence of checkpointing on the
output of the results should be
> minimal in particular for Direct Runner.
It seems what Steve reports
> here seems to be something different. Jan
have you or others already
> checked the influence of this on Flink
who is now using this new
> translation path?
>
> I think the argument that the Direct
runner is mostly about testing
> and not about performance is an argument
that is playing bad on Beam,
> one should not necessarily exclude the
other. Direct runner is our
> most used runner, basically every Beam
user relies on the direct
> runners so every regression or
improvement on it affects everyone, but
> well that's a subject worth its own thread.
>
> On Thu, Dec 17, 2020 at 10:55 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> from my point of view the number in
DirectRunner are set correctly. Primary
purpose of DirectRunner is testing, not
performance, so DirectRunner makes
intentionally frequent checkpoints to
easily exercise potential bugs in user
code. It might be possible to make the
frequency configurable, though.
>>
>> Jan
>>
>> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>
>> It's not a portable execution on
DirectRunner so I would expect that outputs
from
OutputAndTimeBoundedSplittableProcessElementInvoker
should be emitted immediately. For SDF
execution on DirectRunner, the overhead
could come from the SDF expansion, SDF
wrapper and the invoker.
>>
>> Steve, based on your findings, it seems
like it takes more time for the SDF
pipeline to actually start to read from
PubSub and more time to output records. Are
you able to tell how much time each part is
taking?
>>
>> On Wed, Dec 16, 2020 at 1:53 PM Robert
Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
>>> If all it takes is bumping these
numbers up a bit, that seems like a
reasonable thing to do ASAP. (I would argue
that perhaps they shouldn't be static, e.g.
it might be preferable to start emitting
results right away, but use larger batches
for the steady state if there are
performance benefits.)
>>>
>>> That being said, it sounds like there's
something deeper going on here. We should
also verify that this performance impact is
limited to the direct runner.
>>>
>>> On Wed, Dec 16, 2020 at 1:36 PM Steve
Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
>>>> I tried changing my build locally to
10 seconds and 10,000 elements but it
didn't seem to make much of a difference,
it still takes a few minutes for elements
to begin actually showing up to downstream
stages from the Pubsub read. I can see
elements being emitted from
OutputAndTimeBoundedSplittableProcessElementInvoker,
and bundles being committed by
ParDoEvaluator.finishBundle, but after
that, they seem to just kind of disappear
somewhere.
>>>>
>>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>> Making it as the PipelineOptions was
my another proposal but it might take some
time to do so. On the other hand, tuning
the number into something acceptable is
low-hanging fruit.
>>>>>
>>>>> On Wed, Dec 16, 2020 at 12:48 PM
Ismaël Mejía <ieme...@gmail.com
<mailto:ieme...@gmail.com>> wrote:
>>>>>> It sounds reasonable. I am wondering
also on the consequence of these
>>>>>> parameters for other runners (where
it is every 10 seconds or 10000
>>>>>> elements) + their own configuration
e.g. checkpointInterval,
>>>>>> checkpointTimeoutMillis and
minPauseBetweenCheckpoints for Flink. It
>>>>>> is not clear for me what would be
chosen now in this case.
>>>>>>
>>>>>> I know we are a bit anti knobs but
maybe it makes sense to make this
>>>>>> configurable via PipelineOptions at
least for Direct runner.
>>>>>>
>>>>>> On Wed, Dec 16, 2020 at 7:29 PM
Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>> I agree, Ismael.
>>>>>>>
>>>>>>> From my current investigation, the
performance overhead should majorly come
from the frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1],
which is hardcoded in the
DirectRunner(every 1 seconds or 100
elements)[2]. I believe configuring these
numbers on DirectRunner should improve
reported cases so far. My last proposal was
to change the number to every 5 seconds or
10000 elements. What do you think?
>>>>>>>
>>>>>>> [1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>> [2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>>>>>
>>>>>>> On Wed, Dec 16, 2020 at 9:02 AM
Ismaël Mejía <ieme...@gmail.com
<mailto:ieme...@gmail.com>> wrote:
>>>>>>>> I can guess that the same issues
mentioned here probably will affect
>>>>>>>> the usability for people trying
Beam's interactive SQL on Unbounded IO
>>>>>>>> too.
>>>>>>>>
>>>>>>>> We should really take into account
that the performance of the SDF
>>>>>>>> based path should be as good or
better than the previous version
>>>>>>>> before considering its removal
(--experiments=use_deprecated_read) and
>>>>>>>> probably have consensus when this
happens.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM
Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>> From what I've seen, the direct
runner initiates a checkpoint after every
element output.
>>>>>>>>> That seems like the 1 second
limit kicks in before the output reaches
100 elements.
>>>>>>>>>
>>>>>>>>> I think the original purpose for
DirectRunner to use a small limit on
issuing checkpoint requests is for
exercising SDF better in a small data set.
But it brings overhead on a larger set
owing to too many checkpoints. It would be
ideal to make this limit configurable from
pipeline but the easiest approach is that
we figure out a number for most common
cases. Do you think we raise the limit to
1000 elements or every 5 seconds will help?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM
Steve Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
>>>>>>>>>> From what I've seen, the direct
runner initiates a checkpoint after every
element output.
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM
Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>> Hi Antonio,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the details! Which
version of Beam SDK are you using? And are
you using --experiments=beam_fn_api with
DirectRunner to launch your pipeline?
>>>>>>>>>>>
>>>>>>>>>>> For
ReadFromKafkaDoFn.processElement(), it will
take a Kafka topic+partition as input
element and a KafkaConsumer will be
assigned to this topic+partition then poll
records continuously. The Kafka consumer
will resume reading and return from the
process fn when
>>>>>>>>>>>
>>>>>>>>>>> There are no available records
currently(this is a feature of SDF which
calls SDF self-initiated checkpoint)
>>>>>>>>>>> The
OutputAndTimeBoundedSplittableProcessElementInvoker
issues checkpoint request to
ReadFromKafkaDoFn for getting partial
results. The checkpoint frequency for
DirectRunner is every 100 output records or
every 1 seconds.
>>>>>>>>>>>
>>>>>>>>>>> It seems like either the
self-initiated checkpoint or DirectRunner
issued checkpoint gives you the performance
regression since there is overhead when
rescheduling residuals. In your case, it's
more like that the checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to
understand what kind of performance
regression you are noticing? Is it slower
to output the same amount of records?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM
Antonio Si <antonio...@gmail.com
<mailto:antonio...@gmail.com>> wrote:
>>>>>>>>>>>> Hi Boyuan,
>>>>>>>>>>>>
>>>>>>>>>>>> This is Antonio. I reported
the KafkaIO.read() performance issue on the
slack channel a few days ago.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if this is
helpful, but I have been doing some
debugging on the SDK KafkaIO performance
issue for our pipeline and I would like to
provide some observations.
>>>>>>>>>>>>
>>>>>>>>>>>> It looks like in my case the
ReadFromKafkaDoFn.processElement() was
invoked within the same thread and every
time kafaconsumer.poll() is called, it
returns some records, from 1 up to 200
records. So, it will proceed to run the
pipeline steps. Each kafkaconsumer.poll()
takes about 0.8ms. So, in this case, the
polling and running of the pipeline are
executed sequentially within a single
thread. So, after processing a batch of
records, it will need to wait for 0.8ms
before it can process the next batch of
records again.
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions would be
appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Antonio.
>>>>>>>>>>>>
>>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan
Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>> Opened
https://issues.apache.org/jira/browse/BEAM-11403
for tracking.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52
AM Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the pointer,
Steve! I'll check it out. The execution
paths for
>>>>>>>>>>>>>> UnboundedSource and SDF
wrapper are different. It's highly possible
that
>>>>>>>>>>>>>> the regression either comes
from the invocation path for SDF wrapper, or
>>>>>>>>>>>>>> the implementation of SDF
wrapper itself.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33
AM Steve Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Coincidentally, someone
else in the ASF slack mentioned [1] yesterday
>>>>>>>>>>>>>>> that they were seeing
significantly reduced performance using
KafkaIO.Read
>>>>>>>>>>>>>>> w/ the SDF wrapper vs the
unbounded source. They mentioned they were
using
>>>>>>>>>>>>>>> flink 1.9.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56
PM Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Steve,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the major
performance regression comes from
>>>>>>>>>>>>>>>>
OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which will
>>>>>>>>>>>>>>>> checkpoint the DoFn based
on time/output limit and use timers/state to
>>>>>>>>>>>>>>>> reschedule works.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at
9:40 AM Steve Niemitz <sniem...@apache.org
<mailto:sniem...@apache.org>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a pipeline that
reads from pubsub, does some aggregation, and
>>>>>>>>>>>>>>>>> writes to various places.
Previously, in older versions of beam, when
>>>>>>>>>>>>>>>>> running this in the
DirectRunner, messages would go through the
pipeline
>>>>>>>>>>>>>>>>> almost instantly, making
it very easy to debug locally, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, after upgrading
to beam 2.25, I noticed that it could take on
>>>>>>>>>>>>>>>>> the order of 5-10 minutes
for messages to get from the pubsub read
step to
>>>>>>>>>>>>>>>>> the next step in the
pipeline (deserializing them, etc). The
subscription
>>>>>>>>>>>>>>>>> being read from has on
the order of 100,000 elements/sec arriving
in it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Setting
--experiments=use_deprecated_read fixes it,
and makes the
>>>>>>>>>>>>>>>>> pipeline behave as it did
before.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems like the SDF
implementation in the DirectRunner here is
>>>>>>>>>>>>>>>>> causing some kind of
issue, either buffering a very large amount
of data
>>>>>>>>>>>>>>>>> before emitting it in a
bundle, or something else. Has anyone else run
>>>>>>>>>>>>>>>>> into this?
>>>>>>>>>>>>>>>>>