Hi,

from my point of view the number in DirectRunner are set correctly. Primary purpose of DirectRunner is testing, not performance, so DirectRunner makes intentionally frequent checkpoints to easily exercise potential bugs in user code. It might be possible to make the frequency configurable, though.

Jan

On 12/17/20 12:20 AM, Boyuan Zhang wrote:
It's not a portable execution on DirectRunner so I would expect that outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be emitted immediately. For SDF execution on DirectRunner, the overhead could come from the SDF expansion, SDF wrapper and the invoker.

Steve, based on your findings, it seems like it takes more time for the SDF pipeline to actually start to read from PubSub and more time to output records. Are you able to tell how much time each part is taking?

On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <[email protected] <mailto:[email protected]>> wrote:

    If all it takes is bumping these numbers up a bit, that seems like
    a reasonable thing to do ASAP. (I would argue that perhaps they
    shouldn't be static, e.g. it might be preferable to start emitting
    results right away, but use larger batches for the steady state if
    there are performance benefits.)

    That being said, it sounds like there's something deeper going on
    here. We should also verify that this performance impact is
    limited to the direct runner.

    On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <[email protected]
    <mailto:[email protected]>> wrote:

        I tried changing my build locally to 10 seconds and 10,000
        elements but it didn't seem to make much of a difference, it
        still takes a few minutes for elements to begin actually
        showing up to downstream stages from the Pubsub read.  I can
        see elements being emitted
        from OutputAndTimeBoundedSplittableProcessElementInvoker, and
        bundles being committed by ParDoEvaluator.finishBundle, but
        after that, they seem to just kind of disappear somewhere.

        On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang
        <[email protected] <mailto:[email protected]>> wrote:

            Making it as the PipelineOptions was my another proposal
            but it might take some time to do so. On the other hand,
            tuning the number into something acceptable is low-hanging
            fruit.

            On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía
            <[email protected] <mailto:[email protected]>> wrote:

                It sounds reasonable. I am wondering also on the
                consequence of these
                parameters for other runners (where it is every 10
                seconds or 10000
                elements) + their own configuration e.g.
                checkpointInterval,
                checkpointTimeoutMillis and minPauseBetweenCheckpoints
                for Flink. It
                is not clear for me what would be chosen now in this case.

                I know we are a bit anti knobs but maybe it makes
                sense to make this
                configurable via PipelineOptions at least for Direct
                runner.

                On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang
                <[email protected] <mailto:[email protected]>> wrote:
                >
                > I agree, Ismael.
                >
                > From my current investigation, the performance
                overhead should majorly come from the frequency of
                checkpoint in
                OutputAndTimeBoundedSplittableProcessElementinvoker[1],
                which is hardcoded in the DirectRunner(every 1 seconds
                or 100 elements)[2]. I believe configuring these
                numbers on DirectRunner should improve reported cases
                so far. My last proposal was to change the number to
                every 5 seconds or 10000 elements. What do you think?
                >
                > [1]
                
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                > [2]
                
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
                >
                > On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía
                <[email protected] <mailto:[email protected]>> wrote:
                >>
                >> I can guess that the same issues mentioned here
                probably will affect
                >> the usability for people trying Beam's interactive
                SQL on Unbounded IO
                >> too.
                >>
                >> We should really take into account that the
                performance of the SDF
                >> based path should be as good or better than the
                previous version
                >> before considering its removal
                (--experiments=use_deprecated_read) and
                >> probably have consensus when this happens.
                >>
                >>
                >> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang
                <[email protected] <mailto:[email protected]>> wrote:
                >> >
                >> > > From what I've seen, the direct runner
                initiates a checkpoint after every element output.
                >> > That seems like the 1 second limit kicks in
                before the output reaches 100 elements.
                >> >
                >> > I think the original purpose for DirectRunner to
                use a small limit on issuing checkpoint requests is
                for exercising SDF better in a small data set. But it
                brings overhead on a larger set owing to too many
                checkpoints. It would be ideal to make this limit
                configurable from pipeline but the easiest approach is
                that we figure out a number for most common cases. Do
                you think we raise the limit to 1000 elements or every
                5 seconds will help?
                >> >
                >> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz
                <[email protected] <mailto:[email protected]>> wrote:
                >> >>
                >> >> From what I've seen, the direct runner initiates
                a checkpoint after every element output.
                >> >>
                >> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang
                <[email protected] <mailto:[email protected]>> wrote:
                >> >>>
                >> >>> Hi Antonio,
                >> >>>
                >> >>> Thanks for the details! Which version of Beam
                SDK are you using? And are you using
                --experiments=beam_fn_api with DirectRunner to launch
                your pipeline?
                >> >>>
                >> >>> For ReadFromKafkaDoFn.processElement(), it will
                take a Kafka topic+partition as input element and a
                KafkaConsumer will be assigned to this topic+partition
                then poll records continuously. The Kafka consumer
                will resume reading and return from the process fn when
                >> >>>
                >> >>> There are no available records currently(this
                is a feature of SDF which calls SDF self-initiated
                checkpoint)
                >> >>> The
                OutputAndTimeBoundedSplittableProcessElementInvoker
                issues checkpoint request to ReadFromKafkaDoFn for
                getting partial results. The checkpoint frequency for
                DirectRunner is every 100 output records or every 1
                seconds.
                >> >>>
                >> >>> It seems like either the self-initiated
                checkpoint or DirectRunner issued checkpoint gives you
                the performance regression since there is overhead
                when rescheduling residuals. In your case, it's more
                like that the checkpoint behavior of
                OutputAndTimeBoundedSplittableProcessElementInvoker
                gives you 200 elements a batch. I want to understand
                what kind of performance regression you are noticing?
                Is it slower to output the same amount of records?
                >> >>>
                >> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
                <[email protected] <mailto:[email protected]>>
                wrote:
                >> >>>>
                >> >>>> Hi Boyuan,
                >> >>>>
                >> >>>> This is Antonio. I reported the KafkaIO.read()
                performance issue on the slack channel a few days ago.
                >> >>>>
                >> >>>> I am not sure if this is helpful, but I have
                been doing some debugging on the SDK KafkaIO
                performance issue for our pipeline and I would like to
                provide some observations.
                >> >>>>
                >> >>>> It looks like in my case the
                ReadFromKafkaDoFn.processElement()  was invoked within
                the same thread and every time kafaconsumer.poll() is
                called, it returns some records, from 1 up to 200
                records. So, it will proceed to run the pipeline
                steps. Each kafkaconsumer.poll() takes about 0.8ms.
                So, in this case, the polling and running of the
                pipeline are executed sequentially within a single
                thread. So, after processing a batch of records, it
                will need to wait for 0.8ms before it can process the
                next batch of records again.
                >> >>>>
                >> >>>> Any suggestions would be appreciated.
                >> >>>>
                >> >>>> Hope that helps.
                >> >>>>
                >> >>>> Thanks and regards,
                >> >>>>
                >> >>>> Antonio.
                >> >>>>
                >> >>>> On 2020/12/04 19:17:46, Boyuan Zhang
                <[email protected] <mailto:[email protected]>> wrote:
                >> >>>> > Opened
                https://issues.apache.org/jira/browse/BEAM-11403 for
                tracking.
                >> >>>> >
                >> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
                <[email protected] <mailto:[email protected]>> wrote:
                >> >>>> >
                >> >>>> > > Thanks for the pointer, Steve! I'll check
                it out. The execution paths for
                >> >>>> > > UnboundedSource and SDF wrapper are
                different. It's highly possible that
                >> >>>> > > the regression either comes from the
                invocation path for SDF wrapper, or
                >> >>>> > > the implementation of SDF wrapper itself.
                >> >>>> > >
                >> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve
                Niemitz <[email protected]
                <mailto:[email protected]>> wrote:
                >> >>>> > >
                >> >>>> > >> Coincidentally, someone else in the ASF
                slack mentioned [1] yesterday
                >> >>>> > >> that they were seeing significantly
                reduced performance using KafkaIO.Read
                >> >>>> > >> w/ the SDF wrapper vs the unbounded
                source.  They mentioned they were using
                >> >>>> > >> flink 1.9.
                >> >>>> > >>
                >> >>>> > >>
                https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
                >> >>>> > >>
                >> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan
                Zhang <[email protected] <mailto:[email protected]>>
                wrote:
                >> >>>> > >>
                >> >>>> > >>> Hi Steve,
                >> >>>> > >>>
                >> >>>> > >>> I think the major performance regression
                comes from
                >> >>>> > >>>
                OutputAndTimeBoundedSplittableProcessElementInvoker[1],
                which will
                >> >>>> > >>> checkpoint the DoFn based on time/output
                limit and use timers/state to
                >> >>>> > >>> reschedule works.
                >> >>>> > >>>
                >> >>>> > >>> [1]
                >> >>>> > >>>
                
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
                >> >>>> > >>>
                >> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve
                Niemitz <[email protected] <mailto:[email protected]>>
                >> >>>> > >>> wrote:
                >> >>>> > >>>
                >> >>>> > >>>> I have a pipeline that reads from
                pubsub, does some aggregation, and
                >> >>>> > >>>> writes to various places.  Previously,
                in older versions of beam, when
                >> >>>> > >>>> running this in the DirectRunner,
                messages would go through the pipeline
                >> >>>> > >>>> almost instantly, making it very easy
                to debug locally, etc.
                >> >>>> > >>>>
                >> >>>> > >>>> However, after upgrading to beam 2.25,
                I noticed that it could take on
                >> >>>> > >>>> the order of 5-10 minutes for messages
                to get from the pubsub read step to
                >> >>>> > >>>> the next step in the pipeline
                (deserializing them, etc).  The subscription
                >> >>>> > >>>> being read from has on the order of
                100,000 elements/sec arriving in it.
                >> >>>> > >>>>
                >> >>>> > >>>> Setting
                --experiments=use_deprecated_read fixes it, and makes the
                >> >>>> > >>>> pipeline behave as it did before.
                >> >>>> > >>>>
                >> >>>> > >>>> It seems like the SDF implementation in
                the DirectRunner here is
                >> >>>> > >>>> causing some kind of issue, either
                buffering a very large amount of data
                >> >>>> > >>>> before emitting it in a bundle, or
                something else.  Has anyone else run
                >> >>>> > >>>> into this?
                >> >>>> > >>>>
                >> >>>> > >>>
                >> >>>> >

Reply via email to