Hi Boyuan,

I'm not sure if I see the difference between writing a "native" SDF for PubSub source and the UnboundedSource wrapper. With regards to the relation between reader and checkpoint, wouldn't the native implementation be at the same position?

In my point of view, the decision to close the reader is simply a matter of lifecycle of the reader. Currently, it is tightly bound to the restriction being processed, but that could be relaxed, so that instead of immediately closing the reader, it could be only _scheduled for closing in future_ (using processing time timer for instance) provided it is not reused in the remaining restriction after split (by the same instance of DoFn). That is an optimization that could really make sense outside DirectRunner, because for instance Flink has use cases, where user really *wants* to configure quite often checkpoints (has relation to how Flink implements @RequiresStableInput).

Jan

On 12/17/20 9:04 PM, Boyuan Zhang wrote:
Sorry for the confusion.

     Are you saying it *is* necessary to close the reader on
checkpoint, so the only solution is to reduce checkpoint frequency? In the PubSub on DirectRunner with SDF wrapper case, my answer is yes based on my understanding. Closing the reader during checkpoint is the implementation details of how the SDF wrapper wraps the Unbounded/Bounded source. It's not controlled by the DirectRunner and the only thing DirectRunner can control is the frequency of checkpoint, which is hardcoded now. And closing the reader is the right behavior since the work could be distributed to another instance in the real world.

The ideal solution would be to offer a way to make the frequency configurable, most possibly via PipelineOptions. Or we turn the current PubSub UnboundedSource(and other source) implementation into SDF. IIUC, the SDF wrapper is a migration phase of Unbounded/Bounded source to SDF. Eventually we should have every source in SDF.

On Thu, Dec 17, 2020 at 11:49 AM Brian Hulette <bhule...@google.com <mailto:bhule...@google.com>> wrote:

    Boyuan your suggestion seems at odds with Jan's. Are you saying it
    *is* necessary to close the reader on checkpoint, so the only
    solution is to reduce checkpoint frequency?

    On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang <boyu...@google.com
    <mailto:boyu...@google.com>> wrote:

        Thanks for your investigation, Steve! It seems like preventing
        the checkpoint from happening so frequently would be one
        workaround for you. Making the checkpoint frequency
        configurable from pipeline option seems like the way to go.

        On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Hi Steve,

            I didn't mean we should deliberately make DirectRunner
            slow, or we should not fix performance issues, if can be
            fixed. What I meant was that if we are to choose between
            short checkpoint time (and few elements processed before
            checkpoint is taken) or performance, we should prefer
            better tested code, in this particular case.

            > After a bunch of debugging, I think I finally figured
            out what the problem is though. During a checkpoint (in
            trySplit), the UnboundedSourceViaSDF wrapper will close
            the current source reader and create a new one.

            That is actually a great example. The problem should be
            fixed there (the reader probably need not to be closed on
            checkpoint). And it is DirectRunner that manifested this,
            due to short checkpointing.

            Jan

            On 12/17/20 4:14 PM, Steve Niemitz wrote:
            > Primary purpose of DirectRunner is testing, not
            performance

            That's one argument, but it's very difficult to
            effectively test a pipeline when I need to wait 15+
            minutes for the first element to go through it.  I also,
            disagree in general that we shouldn't care about the
            performance of the DirectRunner. It's likely the first
            runner new users of beam try (I know it was for us), and
            if it doesn't provide enough performance to actually run
            a representative pipeline, users may extrapolate that
            performance onto other runners (I know we did). 
            Anecdotally, the fact that the DirectRunner didn't work
            for some of our initial test pipelines (because of
            performance problems) probably delayed our adoption of
            beam by at least 6 months.

            > Steve, based on your findings, it seems like it takes
            more time for the SDF pipeline to actually start to read
            from PubSub and more time to output records.

            Pubsub reads start ~instantly. but I'm not able to see
            any elements actually output from it for a LONG time,
            sometimes 30+ minutes.  I see the reader acking back to
            pubsub, so it IS committing, but no elements output.

            After a bunch of debugging, I think I finally figured out
            what the problem is though.  During a checkpoint (in
            trySplit), the UnboundedSourceViaSDF wrapper will close
            the current source reader and create a new one.  The
            problem is, the pubsub reader needs some time to
            correctly estimate it's watermark [1], and because it
            gets closed and recreated so frequently due to
            checkpointing (either number of elements, or duration),
            it can never actually provide accurate estimates, and
            always returns the min watermark.  This seems like it
            prevents some internal timers from ever firing,
            effectively holding all elements in the pipeline state. 
            I can confirm this also by looking at WatermarkManager,
            where I see all the bundles pending.

            [1]
            
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959

            On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                Hi Ismaël,

                what I meant by the performance vs. testing argument
                is that when
                choosing default values for certain (possibly
                configurable) options, we
                should prefer choices that result in better tested
                code, not better
                performance. DirectRunner actually does quite many
                things that are
                suboptimal performance-wise, but are good to be done
                for test purposes
                (immutability checks, as an example).

                Regarding SDF in general, I can confirm we see some
                issues with Flink,
                most recently [1] (which I'm trying to fix right
                now). That is actually
                correctness, not performance issue. I personally
                didn't notice any
                performance issues, so far.

                Jan

                [1] https://issues.apache.org/jira/browse/BEAM-11481

                On 12/17/20 3:24 PM, Ismaël Mejía wrote:
                > The influence of checkpointing on the output of the
                results should be
                > minimal in particular for Direct Runner. It seems
                what Steve reports
                > here seems to be something different. Jan have you
                or others already
                > checked the influence of this on Flink who is now
                using this new
                > translation path?
                >
                > I think the argument that the Direct runner is
                mostly about testing
                > and not about performance is an argument that is
                playing bad on Beam,
                > one should not necessarily exclude the other.
                Direct runner is our
                > most used runner, basically every Beam user relies
                on the direct
                > runners so every regression or improvement on it
                affects everyone, but
                > well that's a subject worth its own thread.
                >
                > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
                >> Hi,
                >>
                >> from my point of view the number in DirectRunner
                are set correctly. Primary purpose of DirectRunner is
                testing, not performance, so DirectRunner makes
                intentionally frequent checkpoints to easily exercise
                potential bugs in user code. It might be possible to
                make the frequency configurable, though.
                >>
                >> Jan
                >>
                >> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
                >>
                >> It's not a portable execution on DirectRunner so I
                would expect that outputs from
                OutputAndTimeBoundedSplittableProcessElementInvoker
                should be emitted immediately. For SDF execution on
                DirectRunner, the overhead could come from the SDF
                expansion, SDF wrapper and the invoker.
                >>
                >> Steve, based on your findings, it seems like it
                takes more time for the SDF pipeline to actually
                start to read from PubSub and more time to output
                records. Are you able to tell how much time each part
                is taking?
                >>
                >> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw
                <rober...@google.com <mailto:rober...@google.com>> wrote:
                >>> If all it takes is bumping these numbers up a
                bit, that seems like a reasonable thing to do ASAP.
                (I would argue that perhaps they shouldn't be static,
                e.g. it might be preferable to start emitting results
                right away, but use larger batches for the steady
                state if there are performance benefits.)
                >>>
                >>> That being said, it sounds like there's something
                deeper going on here. We should also verify that this
                performance impact is limited to the direct runner.
                >>>
                >>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz
                <sniem...@apache.org <mailto:sniem...@apache.org>> wrote:
                >>>> I tried changing my build locally to 10 seconds
                and 10,000 elements but it didn't seem to make much
                of a difference, it still takes a few minutes for
                elements to begin actually showing up to downstream
                stages from the Pubsub read.  I can see elements
                being emitted from
                OutputAndTimeBoundedSplittableProcessElementInvoker,
                and bundles being committed by
                ParDoEvaluator.finishBundle, but after that, they
                seem to just kind of disappear somewhere.
                >>>>
                >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang
                <boyu...@google.com <mailto:boyu...@google.com>> wrote:
                >>>>> Making it as the PipelineOptions was my another
                proposal but it might take some time to do so. On the
                other hand, tuning the number into something
                acceptable is low-hanging fruit.
                >>>>>
                >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía
                <ieme...@gmail.com <mailto:ieme...@gmail.com>> wrote:
                >>>>>> It sounds reasonable. I am wondering also on
                the consequence of these
                >>>>>> parameters for other runners (where it is
                every 10 seconds or 10000
                >>>>>> elements) + their own configuration e.g.
                checkpointInterval,
                >>>>>> checkpointTimeoutMillis and
                minPauseBetweenCheckpoints for Flink. It
                >>>>>> is not clear for me what would be chosen now
                in this case.
                >>>>>>
                >>>>>> I know we are a bit anti knobs but maybe it
                makes sense to make this
                >>>>>> configurable via PipelineOptions at least for
                Direct runner.
                >>>>>>
                >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang
                <boyu...@google.com <mailto:boyu...@google.com>> wrote:
                >>>>>>> I agree, Ismael.
                >>>>>>>
                >>>>>>>  From my current investigation, the
                performance overhead should majorly come from the
                frequency of checkpoint in
                OutputAndTimeBoundedSplittableProcessElementinvoker[1],
                which is hardcoded in the DirectRunner(every 1
                seconds or 100 elements)[2]. I believe configuring
                these numbers on DirectRunner should improve reported
                cases so far. My last proposal was to change the
                number to every 5 seconds or 10000 elements. What do
                you think?
                >>>>>>>
                >>>>>>> [1]
                
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                >>>>>>> [2]
                
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
                >>>>>>>
                >>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía
                <ieme...@gmail.com <mailto:ieme...@gmail.com>> wrote:
                >>>>>>>> I can guess that the same issues mentioned
                here probably will affect
                >>>>>>>> the usability for people trying Beam's
                interactive SQL on Unbounded IO
                >>>>>>>> too.
                >>>>>>>>
                >>>>>>>> We should really take into account that the
                performance of the SDF
                >>>>>>>> based path should be as good or better than
                the previous version
                >>>>>>>> before considering its removal
                (--experiments=use_deprecated_read) and
                >>>>>>>> probably have consensus when this happens.
                >>>>>>>>
                >>>>>>>>
                >>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan
                Zhang <boyu...@google.com
                <mailto:boyu...@google.com>> wrote:
                >>>>>>>>>> From what I've seen, the direct runner
                initiates a checkpoint after every element output.
                >>>>>>>>> That seems like the 1 second limit kicks in
                before the output reaches 100 elements.
                >>>>>>>>>
                >>>>>>>>> I think the original purpose for
                DirectRunner to use a small limit on issuing
                checkpoint requests is for exercising SDF better in a
                small data set. But it brings overhead on a larger
                set owing to too many checkpoints. It would be ideal
                to make this limit configurable from pipeline but the
                easiest approach is that we figure out a number for
                most common cases. Do you think we raise the limit to
                1000 elements or every 5 seconds will help?
                >>>>>>>>>
                >>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve
                Niemitz <sniem...@apache.org
                <mailto:sniem...@apache.org>> wrote:
                >>>>>>>>>> From what I've seen, the direct runner
                initiates a checkpoint after every element output.
                >>>>>>>>>>
                >>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan
                Zhang <boyu...@google.com
                <mailto:boyu...@google.com>> wrote:
                >>>>>>>>>>> Hi Antonio,
                >>>>>>>>>>>
                >>>>>>>>>>> Thanks for the details! Which version of
                Beam SDK are you using? And are you using
                --experiments=beam_fn_api with DirectRunner to launch
                your pipeline?
                >>>>>>>>>>>
                >>>>>>>>>>> For ReadFromKafkaDoFn.processElement(),
                it will take a Kafka topic+partition as input element
                and a KafkaConsumer will be assigned to this
                topic+partition then poll records continuously. The
                Kafka consumer will resume reading and return from
                the process fn when
                >>>>>>>>>>>
                >>>>>>>>>>> There are no available records
                currently(this is a feature of SDF which calls SDF
                self-initiated checkpoint)
                >>>>>>>>>>> The
                OutputAndTimeBoundedSplittableProcessElementInvoker
                issues checkpoint request to ReadFromKafkaDoFn for
                getting partial results. The checkpoint frequency for
                DirectRunner is every 100 output records or every 1
                seconds.
                >>>>>>>>>>>
                >>>>>>>>>>> It seems like either the self-initiated
                checkpoint or DirectRunner issued checkpoint gives
                you the performance regression since there is
                overhead when rescheduling residuals. In your case,
                it's more like that the checkpoint behavior of
                OutputAndTimeBoundedSplittableProcessElementInvoker
                gives you 200 elements a batch. I want to understand
                what kind of performance regression you are noticing?
                Is it slower to output the same amount of records?
                >>>>>>>>>>>
                >>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio
                Si <antonio...@gmail.com
                <mailto:antonio...@gmail.com>> wrote:
                >>>>>>>>>>>> Hi Boyuan,
                >>>>>>>>>>>>
                >>>>>>>>>>>> This is Antonio. I reported the
                KafkaIO.read() performance issue on the slack channel
                a few days ago.
                >>>>>>>>>>>>
                >>>>>>>>>>>> I am not sure if this is helpful, but I
                have been doing some debugging on the SDK KafkaIO
                performance issue for our pipeline and I would like
                to provide some observations.
                >>>>>>>>>>>>
                >>>>>>>>>>>> It looks like in my case the
                ReadFromKafkaDoFn.processElement() was invoked within
                the same thread and every time kafaconsumer.poll() is
                called, it returns some records, from 1 up to 200
                records. So, it will proceed to run the pipeline
                steps. Each kafkaconsumer.poll() takes about 0.8ms.
                So, in this case, the polling and running of the
                pipeline are executed sequentially within a single
                thread. So, after processing a batch of records, it
                will need to wait for 0.8ms before it can process the
                next batch of records again.
                >>>>>>>>>>>>
                >>>>>>>>>>>> Any suggestions would be appreciated.
                >>>>>>>>>>>>
                >>>>>>>>>>>> Hope that helps.
                >>>>>>>>>>>>
                >>>>>>>>>>>> Thanks and regards,
                >>>>>>>>>>>>
                >>>>>>>>>>>> Antonio.
                >>>>>>>>>>>>
                >>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang
                <boyu...@google.com <mailto:boyu...@google.com>> wrote:
                >>>>>>>>>>>>> Opened
                https://issues.apache.org/jira/browse/BEAM-11403 for
                tracking.
                >>>>>>>>>>>>>
                >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan
                Zhang <boyu...@google.com
                <mailto:boyu...@google.com>> wrote:
                >>>>>>>>>>>>>
                >>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll
                check it out. The execution paths for
                >>>>>>>>>>>>>> UnboundedSource and SDF wrapper are
                different. It's highly possible that
                >>>>>>>>>>>>>> the regression either comes from the
                invocation path for SDF wrapper, or
                >>>>>>>>>>>>>> the implementation of SDF wrapper itself.
                >>>>>>>>>>>>>>
                >>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve
                Niemitz <sniem...@apache.org
                <mailto:sniem...@apache.org>> wrote:
                >>>>>>>>>>>>>>
                >>>>>>>>>>>>>>> Coincidentally, someone else in the
                ASF slack mentioned [1] yesterday
                >>>>>>>>>>>>>>> that they were seeing significantly
                reduced performance using KafkaIO.Read
                >>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded
                source.  They mentioned they were using
                >>>>>>>>>>>>>>> flink 1.9.
                >>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>
                https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
                >>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan
                Zhang <boyu...@google.com
                <mailto:boyu...@google.com>> wrote:
                >>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>> Hi Steve,
                >>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>> I think the major performance
                regression comes from
                >>>>>>>>>>>>>>>>
                OutputAndTimeBoundedSplittableProcessElementInvoker[1],
                which will
                >>>>>>>>>>>>>>>> checkpoint the DoFn based on
                time/output limit and use timers/state to
                >>>>>>>>>>>>>>>> reschedule works.
                >>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>> [1]
                >>>>>>>>>>>>>>>>
                
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                >>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve
                Niemitz <sniem...@apache.org
                <mailto:sniem...@apache.org>>
                >>>>>>>>>>>>>>>> wrote:
                >>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>>> I have a pipeline that reads from
                pubsub, does some aggregation, and
                >>>>>>>>>>>>>>>>> writes to various places. 
                Previously, in older versions of beam, when
                >>>>>>>>>>>>>>>>> running this in the DirectRunner,
                messages would go through the pipeline
                >>>>>>>>>>>>>>>>> almost instantly, making it very
                easy to debug locally, etc.
                >>>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>>> However, after upgrading to beam
                2.25, I noticed that it could take on
                >>>>>>>>>>>>>>>>> the order of 5-10 minutes for
                messages to get from the pubsub read step to
                >>>>>>>>>>>>>>>>> the next step in the pipeline
                (deserializing them, etc).  The subscription
                >>>>>>>>>>>>>>>>> being read from has on the order of
                100,000 elements/sec arriving in it.
                >>>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>>> Setting
                --experiments=use_deprecated_read fixes it, and makes the
                >>>>>>>>>>>>>>>>> pipeline behave as it did before.
                >>>>>>>>>>>>>>>>>
                >>>>>>>>>>>>>>>>> It seems like the SDF
                implementation in the DirectRunner here is
                >>>>>>>>>>>>>>>>> causing some kind of issue, either
                buffering a very large amount of data
                >>>>>>>>>>>>>>>>> before emitting it in a bundle, or
                something else.  Has anyone else run
                >>>>>>>>>>>>>>>>> into this?
                >>>>>>>>>>>>>>>>>

Reply via email to