Boyuan your suggestion seems at odds with Jan's. Are you saying it *is*
necessary to close the reader on checkpoint, so the only solution is to
reduce checkpoint frequency?

On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang <[email protected]> wrote:

> Thanks for your investigation, Steve! It seems like preventing the
> checkpoint from happening so frequently would be one workaround for you.
> Making the checkpoint frequency configurable from pipeline option seems
> like the way to go.
>
> On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi Steve,
>>
>> I didn't mean we should deliberately make DirectRunner slow, or we should
>> not fix performance issues, if can be fixed. What I meant was that if we
>> are to choose between short checkpoint time (and few elements processed
>> before checkpoint is taken) or performance, we should prefer better tested
>> code, in this particular case.
>>
>> > After a bunch of debugging, I think I finally figured out what the
>> problem is though.  During a checkpoint (in trySplit), the
>> UnboundedSourceViaSDF wrapper will close the current source reader and
>> create a new one.
>>
>> That is actually a great example. The problem should be fixed there (the
>> reader probably need not to be closed on checkpoint). And it is
>> DirectRunner that manifested this, due to short checkpointing.
>>
>> Jan
>> On 12/17/20 4:14 PM, Steve Niemitz wrote:
>>
>> > Primary purpose of DirectRunner is testing, not performance
>>
>> That's one argument, but it's very difficult to effectively test a
>> pipeline when I need to wait 15+ minutes for the first element to go
>> through it.  I also, disagree in general that we shouldn't care about the
>> performance of the DirectRunner.  It's likely the first runner new users of
>> beam try (I know it was for us), and if it doesn't provide enough
>> performance to actually run a representative pipeline, users may
>> extrapolate that performance onto other runners (I know we did).
>> Anecdotally, the fact that the DirectRunner didn't work for some of our
>> initial test pipelines (because of performance problems) probably delayed
>> our adoption of beam by at least 6 months.
>>
>> > Steve, based on your findings, it seems like it takes more time for the
>> SDF pipeline to actually start to read from PubSub and more time to output
>> records.
>>
>> Pubsub reads start ~instantly. but I'm not able to see any elements
>> actually output from it for a LONG time, sometimes 30+ minutes.  I see the
>> reader acking back to pubsub, so it IS committing, but no elements output.
>>
>> After a bunch of debugging, I think I finally figured out what the
>> problem is though.  During a checkpoint (in trySplit), the
>> UnboundedSourceViaSDF wrapper will close the current source reader and
>> create a new one.  The problem is, the pubsub reader needs some time to
>> correctly estimate it's watermark [1], and because it gets closed and
>> recreated so frequently due to checkpointing (either number of elements, or
>> duration), it can never actually provide accurate estimates, and always
>> returns the min watermark.  This seems like it prevents some internal
>> timers from ever firing, effectively holding all elements in the pipeline
>> state.  I can confirm this also by looking at WatermarkManager, where I see
>> all the bundles pending.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
>>
>> On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi Ismaël,
>>>
>>> what I meant by the performance vs. testing argument is that when
>>> choosing default values for certain (possibly configurable) options, we
>>> should prefer choices that result in better tested code, not better
>>> performance. DirectRunner actually does quite many things that are
>>> suboptimal performance-wise, but are good to be done for test purposes
>>> (immutability checks, as an example).
>>>
>>> Regarding SDF in general, I can confirm we see some issues with Flink,
>>> most recently [1] (which I'm trying to fix right now). That is actually
>>> correctness, not performance issue. I personally didn't notice any
>>> performance issues, so far.
>>>
>>> Jan
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-11481
>>>
>>> On 12/17/20 3:24 PM, Ismaël Mejía wrote:
>>> > The influence of checkpointing on the output of the results should be
>>> > minimal in particular for Direct Runner. It seems what Steve reports
>>> > here seems to be something different. Jan have you or others already
>>> > checked the influence of this on Flink who is now using this new
>>> > translation path?
>>> >
>>> > I think the argument that the Direct runner is mostly about testing
>>> > and not about performance is an argument that is playing bad on Beam,
>>> > one should not necessarily exclude the other. Direct runner is our
>>> > most used runner, basically every Beam user relies on the direct
>>> > runners so every regression or improvement on it affects everyone, but
>>> > well that's a subject worth its own thread.
>>> >
>>> > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský <[email protected]> wrote:
>>> >> Hi,
>>> >>
>>> >> from my point of view the number in DirectRunner are set correctly.
>>> Primary purpose of DirectRunner is testing, not performance, so
>>> DirectRunner makes intentionally frequent checkpoints to easily exercise
>>> potential bugs in user code. It might be possible to make the frequency
>>> configurable, though.
>>> >>
>>> >> Jan
>>> >>
>>> >> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>> >>
>>> >> It's not a portable execution on DirectRunner so I would expect that
>>> outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be
>>> emitted immediately. For SDF execution on DirectRunner, the overhead could
>>> come from the SDF expansion, SDF wrapper and the invoker.
>>> >>
>>> >> Steve, based on your findings, it seems like it takes more time for
>>> the SDF pipeline to actually start to read from PubSub and more time to
>>> output records. Are you able to tell how much time each part is taking?
>>> >>
>>> >> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>> >>> If all it takes is bumping these numbers up a bit, that seems like a
>>> reasonable thing to do ASAP. (I would argue that perhaps they shouldn't be
>>> static, e.g. it might be preferable to start emitting results right away,
>>> but use larger batches for the steady state if there are performance
>>> benefits.)
>>> >>>
>>> >>> That being said, it sounds like there's something deeper going on
>>> here. We should also verify that this performance impact is limited to the
>>> direct runner.
>>> >>>
>>> >>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <[email protected]>
>>> wrote:
>>> >>>> I tried changing my build locally to 10 seconds and 10,000 elements
>>> but it didn't seem to make much of a difference, it still takes a few
>>> minutes for elements to begin actually showing up to downstream stages from
>>> the Pubsub read.  I can see elements being emitted from
>>> OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being
>>> committed by ParDoEvaluator.finishBundle, but after that, they seem to just
>>> kind of disappear somewhere.
>>> >>>>
>>> >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <[email protected]>
>>> wrote:
>>> >>>>> Making it as the PipelineOptions was my another proposal but it
>>> might take some time to do so. On the other hand, tuning the number into
>>> something acceptable is low-hanging fruit.
>>> >>>>>
>>> >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <[email protected]>
>>> wrote:
>>> >>>>>> It sounds reasonable. I am wondering also on the consequence of
>>> these
>>> >>>>>> parameters for other runners (where it is every 10 seconds or
>>> 10000
>>> >>>>>> elements) + their own configuration e.g. checkpointInterval,
>>> >>>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink.
>>> It
>>> >>>>>> is not clear for me what would be chosen now in this case.
>>> >>>>>>
>>> >>>>>> I know we are a bit anti knobs but maybe it makes sense to make
>>> this
>>> >>>>>> configurable via PipelineOptions at least for Direct runner.
>>> >>>>>>
>>> >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <[email protected]>
>>> wrote:
>>> >>>>>>> I agree, Ismael.
>>> >>>>>>>
>>> >>>>>>>  From my current investigation, the performance overhead should
>>> majorly come from the frequency of checkpoint in
>>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
>>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
>>> configuring these numbers on DirectRunner should improve reported cases so
>>> far. My last proposal was to change the number to every 5 seconds or 10000
>>> elements. What do you think?
>>> >>>>>>>
>>> >>>>>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>> >>>>>>> [2]
>>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>> >>>>>>>
>>> >>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <[email protected]>
>>> wrote:
>>> >>>>>>>> I can guess that the same issues mentioned here probably will
>>> affect
>>> >>>>>>>> the usability for people trying Beam's interactive SQL on
>>> Unbounded IO
>>> >>>>>>>> too.
>>> >>>>>>>>
>>> >>>>>>>> We should really take into account that the performance of the
>>> SDF
>>> >>>>>>>> based path should be as good or better than the previous version
>>> >>>>>>>> before considering its removal
>>> (--experiments=use_deprecated_read) and
>>> >>>>>>>> probably have consensus when this happens.
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <
>>> [email protected]> wrote:
>>> >>>>>>>>>>  From what I've seen, the direct runner initiates a
>>> checkpoint after every element output.
>>> >>>>>>>>> That seems like the 1 second limit kicks in before the output
>>> reaches 100 elements.
>>> >>>>>>>>>
>>> >>>>>>>>> I think the original purpose for DirectRunner to use a small
>>> limit on issuing checkpoint requests is for exercising SDF better in a
>>> small data set. But it brings overhead on a larger set owing to too many
>>> checkpoints. It would be ideal to make this limit configurable from
>>> pipeline but the easiest approach is that we figure out a number for most
>>> common cases. Do you think we raise the limit to 1000 elements or every 5
>>> seconds will help?
>>> >>>>>>>>>
>>> >>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <
>>> [email protected]> wrote:
>>> >>>>>>>>>>  From what I've seen, the direct runner initiates a
>>> checkpoint after every element output.
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <
>>> [email protected]> wrote:
>>> >>>>>>>>>>> Hi Antonio,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Thanks for the details! Which version of Beam SDK are you
>>> using? And are you using --experiments=beam_fn_api with DirectRunner to
>>> launch your pipeline?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>>> topic+partition as input element and a KafkaConsumer will be assigned to
>>> this topic+partition then poll records continuously. The Kafka consumer
>>> will resume reading and return from the process fn when
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> There are no available records currently(this is a feature
>>> of SDF which calls SDF self-initiated checkpoint)
>>> >>>>>>>>>>> The OutputAndTimeBoundedSplittableProcessElementInvoker
>>> issues checkpoint request to ReadFromKafkaDoFn for getting partial results.
>>> The checkpoint frequency for DirectRunner is every 100 output records or
>>> every 1 seconds.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> It seems like either the self-initiated checkpoint or
>>> DirectRunner issued checkpoint gives you the performance regression since
>>> there is overhead when rescheduling residuals. In your case, it's more like
>>> that the checkpoint behavior of
>>> OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200 elements
>>> a batch. I want to understand what kind of performance regression you are
>>> noticing? Is it slower to output the same amount of records?
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <
>>> [email protected]> wrote:
>>> >>>>>>>>>>>> Hi Boyuan,
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> This is Antonio. I reported the KafkaIO.read() performance
>>> issue on the slack channel a few days ago.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I am not sure if this is helpful, but I have been doing
>>> some debugging on the SDK KafkaIO performance issue for our pipeline and I
>>> would like to provide some observations.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> It looks like in my case the
>>> ReadFromKafkaDoFn.processElement()  was invoked within the same thread and
>>> every time kafaconsumer.poll() is called, it returns some records, from 1
>>> up to 200 records. So, it will proceed to run the pipeline steps. Each
>>> kafkaconsumer.poll() takes about 0.8ms. So, in this case, the polling and
>>> running of the pipeline are executed sequentially within a single thread.
>>> So, after processing a batch of records, it will need to wait for 0.8ms
>>> before it can process the next batch of records again.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Any suggestions would be appreciated.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Hope that helps.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Thanks and regards,
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Antonio.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]>
>>> wrote:
>>> >>>>>>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403
>>> for tracking.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <
>>> [email protected]> wrote:
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The
>>> execution paths for
>>> >>>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's
>>> highly possible that
>>> >>>>>>>>>>>>>> the regression either comes from the invocation path for
>>> SDF wrapper, or
>>> >>>>>>>>>>>>>> the implementation of SDF wrapper itself.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>>> [email protected]> wrote:
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned
>>> [1] yesterday
>>> >>>>>>>>>>>>>>> that they were seeing significantly reduced performance
>>> using KafkaIO.Read
>>> >>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.  They
>>> mentioned they were using
>>> >>>>>>>>>>>>>>> flink 1.9.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <
>>> [email protected]> wrote:
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Hi Steve,
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I think the major performance regression comes from
>>> >>>>>>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1],
>>> which will
>>> >>>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use
>>> timers/state to
>>> >>>>>>>>>>>>>>>> reschedule works.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> [1]
>>> >>>>>>>>>>>>>>>>
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>>> [email protected]>
>>> >>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some
>>> aggregation, and
>>> >>>>>>>>>>>>>>>>> writes to various places.  Previously, in older
>>> versions of beam, when
>>> >>>>>>>>>>>>>>>>> running this in the DirectRunner, messages would go
>>> through the pipeline
>>> >>>>>>>>>>>>>>>>> almost instantly, making it very easy to debug
>>> locally, etc.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that
>>> it could take on
>>> >>>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the
>>> pubsub read step to
>>> >>>>>>>>>>>>>>>>> the next step in the pipeline (deserializing them,
>>> etc).  The subscription
>>> >>>>>>>>>>>>>>>>> being read from has on the order of 100,000
>>> elements/sec arriving in it.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it,
>>> and makes the
>>> >>>>>>>>>>>>>>>>> pipeline behave as it did before.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> It seems like the SDF implementation in the
>>> DirectRunner here is
>>> >>>>>>>>>>>>>>>>> causing some kind of issue, either buffering a very
>>> large amount of data
>>> >>>>>>>>>>>>>>>>> before emitting it in a bundle, or something else.
>>> Has anyone else run
>>> >>>>>>>>>>>>>>>>> into this?
>>> >>>>>>>>>>>>>>>>>
>>>
>>

Reply via email to