It's not a portable execution on DirectRunner so I would expect that
outputs from OutputAndTimeBoundedSplittableProcessElementInvoker
should be emitted immediately. For SDF execution on DirectRunner, the
overhead could come from the SDF expansion, SDF wrapper and the invoker.
Steve, based on your findings, it seems like it takes more time for
the SDF pipeline to actually start to read from PubSub and more time
to output records. Are you able to tell how much time each part is taking?
On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
If all it takes is bumping these numbers up a bit, that seems like
a reasonable thing to do ASAP. (I would argue that perhaps they
shouldn't be static, e.g. it might be preferable to start emitting
results right away, but use larger batches for the steady state if
there are performance benefits.)
That being said, it sounds like there's something deeper going on
here. We should also verify that this performance impact is
limited to the direct runner.
On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <[email protected]
<mailto:[email protected]>> wrote:
I tried changing my build locally to 10 seconds and 10,000
elements but it didn't seem to make much of a difference, it
still takes a few minutes for elements to begin actually
showing up to downstream stages from the Pubsub read. I can
see elements being emitted
from OutputAndTimeBoundedSplittableProcessElementInvoker, and
bundles being committed by ParDoEvaluator.finishBundle, but
after that, they seem to just kind of disappear somewhere.
On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
Making it as the PipelineOptions was my another proposal
but it might take some time to do so. On the other hand,
tuning the number into something acceptable is low-hanging
fruit.
On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía
<[email protected] <mailto:[email protected]>> wrote:
It sounds reasonable. I am wondering also on the
consequence of these
parameters for other runners (where it is every 10
seconds or 10000
elements) + their own configuration e.g.
checkpointInterval,
checkpointTimeoutMillis and minPauseBetweenCheckpoints
for Flink. It
is not clear for me what would be chosen now in this case.
I know we are a bit anti knobs but maybe it makes
sense to make this
configurable via PipelineOptions at least for Direct
runner.
On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>
> I agree, Ismael.
>
> From my current investigation, the performance
overhead should majorly come from the frequency of
checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1],
which is hardcoded in the DirectRunner(every 1 seconds
or 100 elements)[2]. I believe configuring these
numbers on DirectRunner should improve reported cases
so far. My last proposal was to change the number to
every 5 seconds or 10000 elements. What do you think?
>
> [1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> [2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>
> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía
<[email protected] <mailto:[email protected]>> wrote:
>>
>> I can guess that the same issues mentioned here
probably will affect
>> the usability for people trying Beam's interactive
SQL on Unbounded IO
>> too.
>>
>> We should really take into account that the
performance of the SDF
>> based path should be as good or better than the
previous version
>> before considering its removal
(--experiments=use_deprecated_read) and
>> probably have consensus when this happens.
>>
>>
>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>> >
>> > > From what I've seen, the direct runner
initiates a checkpoint after every element output.
>> > That seems like the 1 second limit kicks in
before the output reaches 100 elements.
>> >
>> > I think the original purpose for DirectRunner to
use a small limit on issuing checkpoint requests is
for exercising SDF better in a small data set. But it
brings overhead on a larger set owing to too many
checkpoints. It would be ideal to make this limit
configurable from pipeline but the easiest approach is
that we figure out a number for most common cases. Do
you think we raise the limit to 1000 elements or every
5 seconds will help?
>> >
>> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz
<[email protected] <mailto:[email protected]>> wrote:
>> >>
>> >> From what I've seen, the direct runner initiates
a checkpoint after every element output.
>> >>
>> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>> >>>
>> >>> Hi Antonio,
>> >>>
>> >>> Thanks for the details! Which version of Beam
SDK are you using? And are you using
--experiments=beam_fn_api with DirectRunner to launch
your pipeline?
>> >>>
>> >>> For ReadFromKafkaDoFn.processElement(), it will
take a Kafka topic+partition as input element and a
KafkaConsumer will be assigned to this topic+partition
then poll records continuously. The Kafka consumer
will resume reading and return from the process fn when
>> >>>
>> >>> There are no available records currently(this
is a feature of SDF which calls SDF self-initiated
checkpoint)
>> >>> The
OutputAndTimeBoundedSplittableProcessElementInvoker
issues checkpoint request to ReadFromKafkaDoFn for
getting partial results. The checkpoint frequency for
DirectRunner is every 100 output records or every 1
seconds.
>> >>>
>> >>> It seems like either the self-initiated
checkpoint or DirectRunner issued checkpoint gives you
the performance regression since there is overhead
when rescheduling residuals. In your case, it's more
like that the checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to understand
what kind of performance regression you are noticing?
Is it slower to output the same amount of records?
>> >>>
>> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
<[email protected] <mailto:[email protected]>>
wrote:
>> >>>>
>> >>>> Hi Boyuan,
>> >>>>
>> >>>> This is Antonio. I reported the KafkaIO.read()
performance issue on the slack channel a few days ago.
>> >>>>
>> >>>> I am not sure if this is helpful, but I have
been doing some debugging on the SDK KafkaIO
performance issue for our pipeline and I would like to
provide some observations.
>> >>>>
>> >>>> It looks like in my case the
ReadFromKafkaDoFn.processElement() was invoked within
the same thread and every time kafaconsumer.poll() is
called, it returns some records, from 1 up to 200
records. So, it will proceed to run the pipeline
steps. Each kafkaconsumer.poll() takes about 0.8ms.
So, in this case, the polling and running of the
pipeline are executed sequentially within a single
thread. So, after processing a batch of records, it
will need to wait for 0.8ms before it can process the
next batch of records again.
>> >>>>
>> >>>> Any suggestions would be appreciated.
>> >>>>
>> >>>> Hope that helps.
>> >>>>
>> >>>> Thanks and regards,
>> >>>>
>> >>>> Antonio.
>> >>>>
>> >>>> On 2020/12/04 19:17:46, Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>> >>>> > Opened
https://issues.apache.org/jira/browse/BEAM-11403 for
tracking.
>> >>>> >
>> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
<[email protected] <mailto:[email protected]>> wrote:
>> >>>> >
>> >>>> > > Thanks for the pointer, Steve! I'll check
it out. The execution paths for
>> >>>> > > UnboundedSource and SDF wrapper are
different. It's highly possible that
>> >>>> > > the regression either comes from the
invocation path for SDF wrapper, or
>> >>>> > > the implementation of SDF wrapper itself.
>> >>>> > >
>> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve
Niemitz <[email protected]
<mailto:[email protected]>> wrote:
>> >>>> > >
>> >>>> > >> Coincidentally, someone else in the ASF
slack mentioned [1] yesterday
>> >>>> > >> that they were seeing significantly
reduced performance using KafkaIO.Read
>> >>>> > >> w/ the SDF wrapper vs the unbounded
source. They mentioned they were using
>> >>>> > >> flink 1.9.
>> >>>> > >>
>> >>>> > >>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>> >>>> > >>
>> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan
Zhang <[email protected] <mailto:[email protected]>>
wrote:
>> >>>> > >>
>> >>>> > >>> Hi Steve,
>> >>>> > >>>
>> >>>> > >>> I think the major performance regression
comes from
>> >>>> > >>>
OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which will
>> >>>> > >>> checkpoint the DoFn based on time/output
limit and use timers/state to
>> >>>> > >>> reschedule works.
>> >>>> > >>>
>> >>>> > >>> [1]
>> >>>> > >>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> >>>> > >>>
>> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve
Niemitz <[email protected] <mailto:[email protected]>>
>> >>>> > >>> wrote:
>> >>>> > >>>
>> >>>> > >>>> I have a pipeline that reads from
pubsub, does some aggregation, and
>> >>>> > >>>> writes to various places. Previously,
in older versions of beam, when
>> >>>> > >>>> running this in the DirectRunner,
messages would go through the pipeline
>> >>>> > >>>> almost instantly, making it very easy
to debug locally, etc.
>> >>>> > >>>>
>> >>>> > >>>> However, after upgrading to beam 2.25,
I noticed that it could take on
>> >>>> > >>>> the order of 5-10 minutes for messages
to get from the pubsub read step to
>> >>>> > >>>> the next step in the pipeline
(deserializing them, etc). The subscription
>> >>>> > >>>> being read from has on the order of
100,000 elements/sec arriving in it.
>> >>>> > >>>>
>> >>>> > >>>> Setting
--experiments=use_deprecated_read fixes it, and makes the
>> >>>> > >>>> pipeline behave as it did before.
>> >>>> > >>>>
>> >>>> > >>>> It seems like the SDF implementation in
the DirectRunner here is
>> >>>> > >>>> causing some kind of issue, either
buffering a very large amount of data
>> >>>> > >>>> before emitting it in a bundle, or
something else. Has anyone else run
>> >>>> > >>>> into this?
>> >>>> > >>>>
>> >>>> > >>>
>> >>>> >