Hi Boyuan,

> Several changes could be made into PubSub SDF implementation specially. For example, the PuSub SDF can choose not respond to the checkpoint request when it thinks it's not a good time to do so. Besides, if the expensive connection can be binded to the lifecycle of the SDF instance instead of per restriction, the PubSub SDF implementation can choose to start the connection when the DoFn is started and close the connection when tearDown is called.

Why the same cannot be applied to the general case? If we think about the "connection" and the "reader" as two abstract objects, it has the same methods - namely open and close, which is what defines the scope of the object. I think still think it should be possible to implement that generally.

Jan

On 12/17/20 11:19 PM, Boyuan Zhang wrote:
Hi Jan, thanks for the quick followup!

    I'm not sure if I see the difference between writing a "native"
    SDF for PubSub source and the UnboundedSource wrapper. With
    regards to the relation between reader and checkpoint, wouldn't
    the native implementation be at the same position?

Several changes could be made into PubSub SDF implementation specially. For example, the PuSub SDF can choose not respond to the checkpoint request when it thinks it's not a good time to do so. Besides, if the expensive connection can be binded to the lifecycle of the SDF instance instead of per restriction, the PubSub SDF implementation can choose to start the connection when the DoFn is started and close the connection when tearDown is called.

We might not be able to do so on SDF wrapper since it's a kind of general solution for all sources and not all sources don't have the connection binded to the restriction.

Another workaround for using PubSub on DirectRunner might be using --experiments=enable_custom_pubsub_source, This flag will make the pipeline to use a DoFn to read from PubSub instead of using UnboundedSource. Hope it will be helpful as well.


On Thu, Dec 17, 2020 at 1:09 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Boyuan,

    I'm not sure if I see the difference between writing a "native"
    SDF for PubSub source and the UnboundedSource wrapper. With
    regards to the relation between reader and checkpoint, wouldn't
    the native implementation be at the same position?

    In my point of view, the decision to close the reader is simply a
    matter of lifecycle of the reader. Currently, it is tightly bound
    to the restriction being processed, but that could be relaxed, so
    that instead of immediately closing the reader, it could be only
    _scheduled for closing in future_ (using processing time timer for
    instance) provided it is not reused in the remaining restriction
    after split (by the same instance of DoFn). That is an
    optimization that could really make sense outside DirectRunner,
    because for instance Flink has use cases, where user really
    *wants* to configure quite often checkpoints (has relation to how
    Flink implements @RequiresStableInput).

    Jan

    On 12/17/20 9:04 PM, Boyuan Zhang wrote:
    Sorry for the confusion.

         Are you saying it *is* necessary to close the reader on
        checkpoint, so the only solution is to reduce checkpoint
frequency?
    In the PubSub on DirectRunner with SDF wrapper case, my answer is
    yes based on my understanding.
    Closing the reader during checkpoint is the implementation
    details of how the SDF wrapper wraps the Unbounded/Bounded
    source. It's not controlled by the DirectRunner and the only
    thing DirectRunner can control is the frequency of checkpoint,
    which is hardcoded now. And closing the reader is the right
    behavior since the work could be distributed to another instance
    in the real world.

    The ideal solution would be to offer a way to make the frequency
    configurable, most possibly via PipelineOptions. Or we turn the
    current PubSub UnboundedSource(and other source) implementation
    into SDF. IIUC, the SDF wrapper is a migration phase of
    Unbounded/Bounded source to SDF. Eventually we should have every
    source in SDF.

    On Thu, Dec 17, 2020 at 11:49 AM Brian Hulette
    <[email protected] <mailto:[email protected]>> wrote:

        Boyuan your suggestion seems at odds with Jan's. Are you
        saying it *is* necessary to close the reader on checkpoint,
        so the only solution is to reduce checkpoint frequency?

        On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang
        <[email protected] <mailto:[email protected]>> wrote:

            Thanks for your investigation, Steve! It seems like
            preventing the checkpoint from happening so frequently
            would be one workaround for you. Making the checkpoint
            frequency configurable from pipeline option seems like
            the way to go.

            On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi Steve,

                I didn't mean we should deliberately make
                DirectRunner slow, or we should not fix performance
                issues, if can be fixed. What I meant was that if we
                are to choose between short checkpoint time (and few
                elements processed before checkpoint is taken) or
                performance, we should prefer better tested code, in
                this particular case.

                > After a bunch of debugging, I think I finally
                figured out what the problem is though.  During a
                checkpoint (in trySplit), the UnboundedSourceViaSDF
                wrapper will close the current source reader and
                create a new one.

                That is actually a great example. The problem should
                be fixed there (the reader probably need not to be
                closed on checkpoint). And it is DirectRunner that
                manifested this, due to short checkpointing.

                Jan

                On 12/17/20 4:14 PM, Steve Niemitz wrote:
                > Primary purpose of DirectRunner is testing, not
                performance

                That's one argument, but it's very difficult to
                effectively test a pipeline when I need to wait 15+
                minutes for the first element to go through it.  I
                also, disagree in general that we shouldn't care
                about the performance of the DirectRunner.  It's
                likely the first runner new users of beam try (I
                know it was for us), and if it doesn't provide
                enough performance to actually run a
                representative pipeline, users may extrapolate that
                performance onto other runners (I know we did).
                Anecdotally, the fact that the DirectRunner didn't
                work for some of our initial test pipelines (because
                of performance problems) probably delayed our
                adoption of beam by at least 6 months.

                > Steve, based on your findings, it seems like it
                takes more time for the SDF pipeline to actually
                start to read from PubSub and more time to output
                records.

                Pubsub reads start ~instantly. but I'm not able to
                see any elements actually output from it for a LONG
                time, sometimes 30+ minutes.  I see the reader
                acking back to pubsub, so it IS committing, but no
                elements output.

                After a bunch of debugging, I think I finally
                figured out what the problem is though.  During a
                checkpoint (in trySplit), the UnboundedSourceViaSDF
                wrapper will close the current source reader and
                create a new one.  The problem is, the pubsub reader
                needs some time to correctly estimate it's watermark
                [1], and because it gets closed and recreated so
                frequently due to checkpointing (either number of
                elements, or duration), it can never actually
                provide accurate estimates, and always returns the
                min watermark.  This seems like it prevents some
                internal timers from ever firing, effectively
                holding all elements in the pipeline state.  I can
                confirm this also by looking at WatermarkManager,
                where I see all the bundles pending.

                [1]
                
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959

                On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi Ismaël,

                    what I meant by the performance vs. testing
                    argument is that when
                    choosing default values for certain (possibly
                    configurable) options, we
                    should prefer choices that result in better
                    tested code, not better
                    performance. DirectRunner actually does quite
                    many things that are
                    suboptimal performance-wise, but are good to be
                    done for test purposes
                    (immutability checks, as an example).

                    Regarding SDF in general, I can confirm we see
                    some issues with Flink,
                    most recently [1] (which I'm trying to fix right
                    now). That is actually
                    correctness, not performance issue. I personally
                    didn't notice any
                    performance issues, so far.

                    Jan

                    [1] https://issues.apache.org/jira/browse/BEAM-11481

                    On 12/17/20 3:24 PM, Ismaël Mejía wrote:
                    > The influence of checkpointing on the output
                    of the results should be
                    > minimal in particular for Direct Runner. It
                    seems what Steve reports
                    > here seems to be something different. Jan have
                    you or others already
                    > checked the influence of this on Flink who is
                    now using this new
                    > translation path?
                    >
                    > I think the argument that the Direct runner is
                    mostly about testing
                    > and not about performance is an argument that
                    is playing bad on Beam,
                    > one should not necessarily exclude the other.
                    Direct runner is our
                    > most used runner, basically every Beam user
                    relies on the direct
                    > runners so every regression or improvement on
                    it affects everyone, but
                    > well that's a subject worth its own thread.
                    >
                    > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:
                    >> Hi,
                    >>
                    >> from my point of view the number in
                    DirectRunner are set correctly. Primary purpose
                    of DirectRunner is testing, not performance, so
                    DirectRunner makes intentionally frequent
                    checkpoints to easily exercise potential bugs in
                    user code. It might be possible to make the
                    frequency configurable, though.
                    >>
                    >> Jan
                    >>
                    >> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
                    >>
                    >> It's not a portable execution on DirectRunner
                    so I would expect that outputs from
                    OutputAndTimeBoundedSplittableProcessElementInvoker
                    should be emitted immediately. For SDF execution
                    on DirectRunner, the overhead could come from
                    the SDF expansion, SDF wrapper and the invoker.
                    >>
                    >> Steve, based on your findings, it seems like
                    it takes more time for the SDF pipeline to
                    actually start to read from PubSub and more time
                    to output records. Are you able to tell how much
                    time each part is taking?
                    >>
                    >> On Wed, Dec 16, 2020 at 1:53 PM Robert
                    Bradshaw <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>> If all it takes is bumping these numbers up
                    a bit, that seems like a reasonable thing to do
                    ASAP. (I would argue that perhaps they shouldn't
                    be static, e.g. it might be preferable to start
                    emitting results right away, but use larger
                    batches for the steady state if there are
                    performance benefits.)
                    >>>
                    >>> That being said, it sounds like there's
                    something deeper going on here. We should also
                    verify that this performance impact is limited
                    to the direct runner.
                    >>>
                    >>> On Wed, Dec 16, 2020 at 1:36 PM Steve
                    Niemitz <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>> I tried changing my build locally to 10
                    seconds and 10,000 elements but it didn't seem
                    to make much of a difference, it still takes a
                    few minutes for elements to begin actually
                    showing up to downstream stages from the Pubsub
                    read.  I can see elements being emitted from
                    OutputAndTimeBoundedSplittableProcessElementInvoker,
                    and bundles being committed by
                    ParDoEvaluator.finishBundle, but after that,
                    they seem to just kind of disappear somewhere.
                    >>>>
                    >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan
                    Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>> Making it as the PipelineOptions was my
                    another proposal but it might take some time to
                    do so. On the other hand, tuning the number into
                    something acceptable is low-hanging fruit.
                    >>>>>
                    >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël
                    Mejía <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>> It sounds reasonable. I am wondering also
                    on the consequence of these
                    >>>>>> parameters for other runners (where it is
                    every 10 seconds or 10000
                    >>>>>> elements) + their own configuration e.g.
                    checkpointInterval,
                    >>>>>> checkpointTimeoutMillis and
                    minPauseBetweenCheckpoints for Flink. It
                    >>>>>> is not clear for me what would be chosen
                    now in this case.
                    >>>>>>
                    >>>>>> I know we are a bit anti knobs but maybe
                    it makes sense to make this
                    >>>>>> configurable via PipelineOptions at least
                    for Direct runner.
                    >>>>>>
                    >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan
                    Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>> I agree, Ismael.
                    >>>>>>>
                    >>>>>>>  From my current investigation, the
                    performance overhead should majorly come from
                    the frequency of checkpoint in
                    OutputAndTimeBoundedSplittableProcessElementinvoker[1],
                    which is hardcoded in the DirectRunner(every 1
                    seconds or 100 elements)[2]. I believe
                    configuring these numbers on DirectRunner should
                    improve reported cases so far. My last proposal
                    was to change the number to every 5 seconds or
                    10000 elements. What do you think?
                    >>>>>>>
                    >>>>>>> [1]
                    
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                    >>>>>>> [2]
                    
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
                    >>>>>>>
                    >>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël
                    Mejía <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>> I can guess that the same issues
                    mentioned here probably will affect
                    >>>>>>>> the usability for people trying Beam's
                    interactive SQL on Unbounded IO
                    >>>>>>>> too.
                    >>>>>>>>
                    >>>>>>>> We should really take into account that
                    the performance of the SDF
                    >>>>>>>> based path should be as good or better
                    than the previous version
                    >>>>>>>> before considering its removal
                    (--experiments=use_deprecated_read) and
                    >>>>>>>> probably have consensus when this happens.
                    >>>>>>>>
                    >>>>>>>>
                    >>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan
                    Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>  From what I've seen, the direct
                    runner initiates a checkpoint after every
                    element output.
                    >>>>>>>>> That seems like the 1 second limit
                    kicks in before the output reaches 100 elements.
                    >>>>>>>>>
                    >>>>>>>>> I think the original purpose for
                    DirectRunner to use a small limit on issuing
                    checkpoint requests is for exercising SDF better
                    in a small data set. But it brings overhead on a
                    larger set owing to too many checkpoints. It
                    would be ideal to make this limit configurable
                    from pipeline but the easiest approach is that
                    we figure out a number for most common cases. Do
                    you think we raise the limit to 1000 elements or
                    every 5 seconds will help?
                    >>>>>>>>>
                    >>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve
                    Niemitz <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>  From what I've seen, the direct
                    runner initiates a checkpoint after every
                    element output.
                    >>>>>>>>>>
                    >>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM
                    Boyuan Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>> Hi Antonio,
                    >>>>>>>>>>>
                    >>>>>>>>>>> Thanks for the details! Which
                    version of Beam SDK are you using? And are you
                    using --experiments=beam_fn_api with
                    DirectRunner to launch your pipeline?
                    >>>>>>>>>>>
                    >>>>>>>>>>> For
                    ReadFromKafkaDoFn.processElement(), it will take
                    a Kafka topic+partition as input element and a
                    KafkaConsumer will be assigned to this
                    topic+partition then poll records continuously.
                    The Kafka consumer will resume reading and
                    return from the process fn when
                    >>>>>>>>>>>
                    >>>>>>>>>>> There are no available records
                    currently(this is a feature of SDF which calls
                    SDF self-initiated checkpoint)
                    >>>>>>>>>>> The
                    OutputAndTimeBoundedSplittableProcessElementInvoker
                    issues checkpoint request to ReadFromKafkaDoFn
                    for getting partial results. The checkpoint
                    frequency for DirectRunner is every 100 output
                    records or every 1 seconds.
                    >>>>>>>>>>>
                    >>>>>>>>>>> It seems like either the
                    self-initiated checkpoint or DirectRunner issued
                    checkpoint gives you the performance regression
                    since there is overhead when rescheduling
                    residuals. In your case, it's more like that the
                    checkpoint behavior of
                    OutputAndTimeBoundedSplittableProcessElementInvoker
                    gives you 200 elements a batch. I want to
                    understand what kind of performance regression
                    you are noticing? Is it slower to output the
                    same amount of records?
                    >>>>>>>>>>>
                    >>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM
                    Antonio Si <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>>> Hi Boyuan,
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> This is Antonio. I reported the
                    KafkaIO.read() performance issue on the slack
                    channel a few days ago.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> I am not sure if this is helpful,
                    but I have been doing some debugging on the SDK
                    KafkaIO performance issue for our pipeline and I
                    would like to provide some observations.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> It looks like in my case the
                    ReadFromKafkaDoFn.processElement() was invoked
                    within the same thread and every time
                    kafaconsumer.poll() is called, it returns some
                    records, from 1 up to 200 records. So, it will
                    proceed to run the pipeline steps. Each
                    kafkaconsumer.poll() takes about 0.8ms. So, in
                    this case, the polling and running of the
                    pipeline are executed sequentially within a
                    single thread. So, after processing a batch of
                    records, it will need to wait for 0.8ms before
                    it can process the next batch of records again.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> Any suggestions would be appreciated.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> Hope that helps.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> Thanks and regards,
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> Antonio.
                    >>>>>>>>>>>>
                    >>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan
                    Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>>>> Opened
                    https://issues.apache.org/jira/browse/BEAM-11403
                    for tracking.
                    >>>>>>>>>>>>>
                    >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM
                    Boyuan Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>>>>
                    >>>>>>>>>>>>>> Thanks for the pointer, Steve!
                    I'll check it out. The execution paths for
                    >>>>>>>>>>>>>> UnboundedSource and SDF wrapper
                    are different. It's highly possible that
                    >>>>>>>>>>>>>> the regression either comes from
                    the invocation path for SDF wrapper, or
                    >>>>>>>>>>>>>> the implementation of SDF wrapper
                    itself.
                    >>>>>>>>>>>>>>
                    >>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM
                    Steve Niemitz <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>> Coincidentally, someone else in
                    the ASF slack mentioned [1] yesterday
                    >>>>>>>>>>>>>>> that they were seeing
                    significantly reduced performance using KafkaIO.Read
                    >>>>>>>>>>>>>>> w/ the SDF wrapper vs the
                    unbounded source. They mentioned they were using
                    >>>>>>>>>>>>>>> flink 1.9.
                    >>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>
                    
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
                    >>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM
                    Boyuan Zhang <[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>> Hi Steve,
                    >>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>> I think the major performance
                    regression comes from
                    >>>>>>>>>>>>>>>>
                    OutputAndTimeBoundedSplittableProcessElementInvoker[1],
                    which will
                    >>>>>>>>>>>>>>>> checkpoint the DoFn based on
                    time/output limit and use timers/state to
                    >>>>>>>>>>>>>>>> reschedule works.
                    >>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>> [1]
                    >>>>>>>>>>>>>>>>
                    
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                    >>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM
                    Steve Niemitz <[email protected]
                    <mailto:[email protected]>>
                    >>>>>>>>>>>>>>>> wrote:
                    >>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>>> I have a pipeline that reads
                    from pubsub, does some aggregation, and
                    >>>>>>>>>>>>>>>>> writes to various places.
                    Previously, in older versions of beam, when
                    >>>>>>>>>>>>>>>>> running this in the
                    DirectRunner, messages would go through the pipeline
                    >>>>>>>>>>>>>>>>> almost instantly, making it
                    very easy to debug locally, etc.
                    >>>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>>> However, after upgrading to
                    beam 2.25, I noticed that it could take on
                    >>>>>>>>>>>>>>>>> the order of 5-10 minutes for
                    messages to get from the pubsub read step to
                    >>>>>>>>>>>>>>>>> the next step in the pipeline
                    (deserializing them, etc).  The subscription
                    >>>>>>>>>>>>>>>>> being read from has on the
                    order of 100,000 elements/sec arriving in it.
                    >>>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>>> Setting
                    --experiments=use_deprecated_read fixes it, and
                    makes the
                    >>>>>>>>>>>>>>>>> pipeline behave as it did before.
                    >>>>>>>>>>>>>>>>>
                    >>>>>>>>>>>>>>>>> It seems like the SDF
                    implementation in the DirectRunner here is
                    >>>>>>>>>>>>>>>>> causing some kind of issue,
                    either buffering a very large amount of data
                    >>>>>>>>>>>>>>>>> before emitting it in a
                    bundle, or something else.  Has anyone else run
                    >>>>>>>>>>>>>>>>> into this?
                    >>>>>>>>>>>>>>>>>

Reply via email to