> Primary purpose of DirectRunner is testing, not performance
That's one argument, but it's very difficult to effectively test a
pipeline when I need to wait 15+ minutes for the first element to go
through it. I also, disagree in general that we shouldn't care about
the performance of the DirectRunner. It's likely the first runner new
users of beam try (I know it was for us), and if it doesn't provide
enough performance to actually run a representative pipeline, users
may extrapolate that performance onto other runners (I know we did).
Anecdotally, the fact that the DirectRunner didn't work for some of
our initial test pipelines (because of performance problems) probably
delayed our adoption of beam by at least 6 months.
> Steve, based on your findings, it seems like it takes more time for
the SDF pipeline to actually start to read from PubSub and more time
to output records.
Pubsub reads start ~instantly. but I'm not able to see any elements
actually output from it for a LONG time, sometimes 30+ minutes. I see
the reader acking back to pubsub, so it IS committing, but no elements
output.
After a bunch of debugging, I think I finally figured out what the
problem is though. During a checkpoint (in trySplit), the
UnboundedSourceViaSDF wrapper will close the current source reader and
create a new one. The problem is, the pubsub reader needs some time
to correctly estimate it's watermark [1], and because it gets closed
and recreated so frequently due to checkpointing (either number of
elements, or duration), it can never actually provide accurate
estimates, and always returns the min watermark. This seems like it
prevents some internal timers from ever firing, effectively holding
all elements in the pipeline state. I can confirm this also by
looking at WatermarkManager, where I see all the bundles pending.
[1]
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Ismaël,
what I meant by the performance vs. testing argument is that when
choosing default values for certain (possibly configurable)
options, we
should prefer choices that result in better tested code, not better
performance. DirectRunner actually does quite many things that are
suboptimal performance-wise, but are good to be done for test
purposes
(immutability checks, as an example).
Regarding SDF in general, I can confirm we see some issues with
Flink,
most recently [1] (which I'm trying to fix right now). That is
actually
correctness, not performance issue. I personally didn't notice any
performance issues, so far.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11481
On 12/17/20 3:24 PM, Ismaël Mejía wrote:
> The influence of checkpointing on the output of the results
should be
> minimal in particular for Direct Runner. It seems what Steve reports
> here seems to be something different. Jan have you or others already
> checked the influence of this on Flink who is now using this new
> translation path?
>
> I think the argument that the Direct runner is mostly about testing
> and not about performance is an argument that is playing bad on
Beam,
> one should not necessarily exclude the other. Direct runner is our
> most used runner, basically every Beam user relies on the direct
> runners so every regression or improvement on it affects
everyone, but
> well that's a subject worth its own thread.
>
> On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
>> Hi,
>>
>> from my point of view the number in DirectRunner are set
correctly. Primary purpose of DirectRunner is testing, not
performance, so DirectRunner makes intentionally frequent
checkpoints to easily exercise potential bugs in user code. It
might be possible to make the frequency configurable, though.
>>
>> Jan
>>
>> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>
>> It's not a portable execution on DirectRunner so I would expect
that outputs from
OutputAndTimeBoundedSplittableProcessElementInvoker should be
emitted immediately. For SDF execution on DirectRunner, the
overhead could come from the SDF expansion, SDF wrapper and the
invoker.
>>
>> Steve, based on your findings, it seems like it takes more time
for the SDF pipeline to actually start to read from PubSub and
more time to output records. Are you able to tell how much time
each part is taking?
>>
>> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw
<[email protected] <mailto:[email protected]>> wrote:
>>> If all it takes is bumping these numbers up a bit, that seems
like a reasonable thing to do ASAP. (I would argue that perhaps
they shouldn't be static, e.g. it might be preferable to start
emitting results right away, but use larger batches for the steady
state if there are performance benefits.)
>>>
>>> That being said, it sounds like there's something deeper going
on here. We should also verify that this performance impact is
limited to the direct runner.
>>>
>>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
>>>> I tried changing my build locally to 10 seconds and 10,000
elements but it didn't seem to make much of a difference, it still
takes a few minutes for elements to begin actually showing up to
downstream stages from the Pubsub read. I can see elements being
emitted from OutputAndTimeBoundedSplittableProcessElementInvoker,
and bundles being committed by ParDoEvaluator.finishBundle, but
after that, they seem to just kind of disappear somewhere.
>>>>
>>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>> Making it as the PipelineOptions was my another proposal but
it might take some time to do so. On the other hand, tuning the
number into something acceptable is low-hanging fruit.
>>>>>
>>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía
<[email protected] <mailto:[email protected]>> wrote:
>>>>>> It sounds reasonable. I am wondering also on the
consequence of these
>>>>>> parameters for other runners (where it is every 10 seconds
or 10000
>>>>>> elements) + their own configuration e.g. checkpointInterval,
>>>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for
Flink. It
>>>>>> is not clear for me what would be chosen now in this case.
>>>>>>
>>>>>> I know we are a bit anti knobs but maybe it makes sense to
make this
>>>>>> configurable via PipelineOptions at least for Direct runner.
>>>>>>
>>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>> I agree, Ismael.
>>>>>>>
>>>>>>> From my current investigation, the performance overhead
should majorly come from the frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is
hardcoded in the DirectRunner(every 1 seconds or 100 elements)[2].
I believe configuring these numbers on DirectRunner should improve
reported cases so far. My last proposal was to change the number
to every 5 seconds or 10000 elements. What do you think?
>>>>>>>
>>>>>>> [1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>> [2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>>>>>
>>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>> I can guess that the same issues mentioned here probably
will affect
>>>>>>>> the usability for people trying Beam's interactive SQL on
Unbounded IO
>>>>>>>> too.
>>>>>>>>
>>>>>>>> We should really take into account that the performance
of the SDF
>>>>>>>> based path should be as good or better than the previous
version
>>>>>>>> before considering its removal
(--experiments=use_deprecated_read) and
>>>>>>>> probably have consensus when this happens.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>> From what I've seen, the direct runner initiates a
checkpoint after every element output.
>>>>>>>>> That seems like the 1 second limit kicks in before the
output reaches 100 elements.
>>>>>>>>>
>>>>>>>>> I think the original purpose for DirectRunner to use a
small limit on issuing checkpoint requests is for exercising SDF
better in a small data set. But it brings overhead on a larger set
owing to too many checkpoints. It would be ideal to make this
limit configurable from pipeline but the easiest approach is that
we figure out a number for most common cases. Do you think we
raise the limit to 1000 elements or every 5 seconds will help?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>> From what I've seen, the direct runner initiates a
checkpoint after every element output.
>>>>>>>>>>
>>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>> Hi Antonio,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the details! Which version of Beam SDK are
you using? And are you using --experiments=beam_fn_api with
DirectRunner to launch your pipeline?
>>>>>>>>>>>
>>>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a
Kafka topic+partition as input element and a KafkaConsumer will be
assigned to this topic+partition then poll records continuously.
The Kafka consumer will resume reading and return from the process
fn when
>>>>>>>>>>>
>>>>>>>>>>> There are no available records currently(this is a
feature of SDF which calls SDF self-initiated checkpoint)
>>>>>>>>>>> The
OutputAndTimeBoundedSplittableProcessElementInvoker issues
checkpoint request to ReadFromKafkaDoFn for getting partial
results. The checkpoint frequency for DirectRunner is every 100
output records or every 1 seconds.
>>>>>>>>>>>
>>>>>>>>>>> It seems like either the self-initiated checkpoint or
DirectRunner issued checkpoint gives you the performance
regression since there is overhead when rescheduling residuals. In
your case, it's more like that the checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200
elements a batch. I want to understand what kind of performance
regression you are noticing? Is it slower to output the same
amount of records?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>> Hi Boyuan,
>>>>>>>>>>>>
>>>>>>>>>>>> This is Antonio. I reported the KafkaIO.read()
performance issue on the slack channel a few days ago.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure if this is helpful, but I have been
doing some debugging on the SDK KafkaIO performance issue for our
pipeline and I would like to provide some observations.
>>>>>>>>>>>>
>>>>>>>>>>>> It looks like in my case the
ReadFromKafkaDoFn.processElement() was invoked within the same
thread and every time kafaconsumer.poll() is called, it returns
some records, from 1 up to 200 records. So, it will proceed to run
the pipeline steps. Each kafkaconsumer.poll() takes about 0.8ms.
So, in this case, the polling and running of the pipeline are
executed sequentially within a single thread. So, after processing
a batch of records, it will need to wait for 0.8ms before it can
process the next batch of records again.
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions would be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks and regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Antonio.
>>>>>>>>>>>>
>>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>>> Opened
https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out.
The execution paths for
>>>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's
highly possible that
>>>>>>>>>>>>>> the regression either comes from the invocation
path for SDF wrapper, or
>>>>>>>>>>>>>> the implementation of SDF wrapper itself.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack
mentioned [1] yesterday
>>>>>>>>>>>>>>> that they were seeing significantly reduced
performance using KafkaIO.Read
>>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source. They
mentioned they were using
>>>>>>>>>>>>>>> flink 1.9.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Steve,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the major performance regression comes from
>>>>>>>>>>>>>>>>
OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>>>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit
and use timers/state to
>>>>>>>>>>>>>>>> reschedule works.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz
<[email protected] <mailto:[email protected]>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does
some aggregation, and
>>>>>>>>>>>>>>>>> writes to various places. Previously, in older
versions of beam, when
>>>>>>>>>>>>>>>>> running this in the DirectRunner, messages would
go through the pipeline
>>>>>>>>>>>>>>>>> almost instantly, making it very easy to debug
locally, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed
that it could take on
>>>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get
from the pubsub read step to
>>>>>>>>>>>>>>>>> the next step in the pipeline (deserializing
them, etc). The subscription
>>>>>>>>>>>>>>>>> being read from has on the order of 100,000
elements/sec arriving in it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes
it, and makes the
>>>>>>>>>>>>>>>>> pipeline behave as it did before.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems like the SDF implementation in the
DirectRunner here is
>>>>>>>>>>>>>>>>> causing some kind of issue, either buffering a
very large amount of data
>>>>>>>>>>>>>>>>> before emitting it in a bundle, or something
else. Has anyone else run
>>>>>>>>>>>>>>>>> into this?
>>>>>>>>>>>>>>>>>