Sure. My ID is je-ik.

Thanks,

 Jan

On 12/21/20 8:43 PM, Boyuan Zhang wrote:
Thanks for your explanation, Jan. Now I can see what you mean here. I can try to have a PR to do such optimization. Would you like to share your github ID with me to review the PR later?

On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote:

    If readers are expensive to create, this seems like an important
    (and not too difficult) optimization.

    On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi Boyuan,

        I think your analysis is correct - with one exception. It
        should  be possible to reuse the reader if and only if the
        last taken CheckpointMark equals to the new CheckpointMark the
        reader would be created from. But - this equality is on the
        happy path and should be satisfied for vast majority of
        invocations, so it will spare many call to createReader.
        Actually, it should be non-equal only after recovery from
        checkpoint, but then there should be no reader. So to be
        technically correct, we should keep the last CheckpointMark
        along with the open reader, but that might turn out to be
        non-necessary (I'm not sure about that and I would definitely
        keep the last CheckpointMark, because it is better safe than
        sorry :))

        Jan

        On 12/21/20 7:54 PM, Boyuan Zhang wrote:
        Hi Jan,

            it seems that what we would want is to couple the
            lifecycle of the Reader not with the restriction but with
            the particular instance of (Un)boundedSource (after being
            split). That could be done in the processing DoFn, if it
            contained a cache mapping instance of the source to the
            (possibly null - i.e. not yet open) reader. In
            @NewTracker we could assign (or create) the reader to the
            tracker, as the tracker is created for each restriction.

            WDYT?

        I was thinking about this but it seems like it is not
        applicable to the way how UnboundedSource and UnboundedReader
        work together.
        Please correct me if I'm wrong. The UnboundedReader is
        created from UnboundedSource per CheckpointMark[1], which
        means for certain sources, the CheckpointMark could affect
        some attributes like start position of the reader when
        resuming. So a single UnboundedSource could be mapped to
        multiple readers because of different instances of
        CheckpointMarl. That's also the reason why we use
        CheckpointMark as the restriction.

        Please let me know if I misunderstand your suggestion.

        [1]
        
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78

        On Mon, Dec 21, 2020 at 9:18 AM Antonio Si
        <antonio...@gmail.com <mailto:antonio...@gmail.com>> wrote:

            Hi Boyuan,

            Sorry for my late reply. I was off for a few days.

            I didn't use DirectRunner. I am using FlinkRunner.

            We measured the number of Kafka messages that we can
            processed per second.
            With Beam v2.26 with --experiments=use_deprecated_read
            and --fasterCopy=true,
            we are able to consume 13K messages per second, but with
            Beam v2.26
            without the use_deprecated_read option, we are only able
            to process 10K messages
            per second for the same pipeline.

            Thanks and regards,

            Antonio.

            On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com
            <mailto:boyu...@google.com>> wrote:
            > Hi Antonio,
            >
            > Thanks for the details! Which version of Beam SDK are
            you using? And are
            > you using --experiments=beam_fn_api with DirectRunner
            to launch your
            > pipeline?
            >
            > For ReadFromKafkaDoFn.processElement(), it will take a
            Kafka
            > topic+partition as input element and a KafkaConsumer
            will be assigned to
            > this topic+partition then poll records continuously.
            The Kafka consumer
            > will resume reading and return from the process fn when
            >
            >    - There are no available records currently(this is a
            feature of SDF
            >    which calls SDF self-initiated checkpoint)
            >    - The
            OutputAndTimeBoundedSplittableProcessElementInvoker issues
            >    checkpoint request to ReadFromKafkaDoFn for getting
            partial results. The
            >    checkpoint frequency for DirectRunner is every 100
            output records or every
            >    1 seconds.
            >
            > It seems like either the self-initiated checkpoint or
            DirectRunner issued
            > checkpoint gives you the performance regression since
            there is overhead
            > when rescheduling residuals. In your case, it's more
            like that the
            > checkpoint behavior of
            OutputAndTimeBoundedSplittableProcessElementInvoker
            > gives you 200 elements a batch. I want to understand
            what kind of
            > performance regression you are noticing? Is it slower
            to output the same
            > amount of records?
            >
            > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
            <antonio...@gmail.com <mailto:antonio...@gmail.com>> wrote:
            >
            > > Hi Boyuan,
            > >
            > > This is Antonio. I reported the KafkaIO.read()
            performance issue on the
            > > slack channel a few days ago.
            > >
            > > I am not sure if this is helpful, but I have been
            doing some debugging on
            > > the SDK KafkaIO performance issue for our pipeline
            and I would like to
            > > provide some observations.
            > >
            > > It looks like in my case the
            ReadFromKafkaDoFn.processElement()  was
            > > invoked within the same thread and every time
            kafaconsumer.poll() is
            > > called, it returns some records, from 1 up to 200
            records. So, it will
            > > proceed to run the pipeline steps. Each
            kafkaconsumer.poll() takes about
            > > 0.8ms. So, in this case, the polling and running of
            the pipeline are
            > > executed sequentially within a single thread. So,
            after processing a batch
            > > of records, it will need to wait for 0.8ms before it
            can process the next
            > > batch of records again.
            > >
            > > Any suggestions would be appreciated.
            > >
            > > Hope that helps.
            > >
            > > Thanks and regards,
            > >
            > > Antonio.
            > >
            > > On 2020/12/04 19:17:46, Boyuan Zhang
            <boyu...@google.com <mailto:boyu...@google.com>> wrote:
            > > > Opened
            https://issues.apache.org/jira/browse/BEAM-11403 for
            tracking.
            > > >
            > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
            <boyu...@google.com <mailto:boyu...@google.com>> wrote:
            > > >
            > > > > Thanks for the pointer, Steve! I'll check it out.
            The execution paths
            > > for
            > > > > UnboundedSource and SDF wrapper are different.
            It's highly possible
            > > that
            > > > > the regression either comes from the invocation
            path for SDF wrapper,
            > > or
            > > > > the implementation of SDF wrapper itself.
            > > > >
            > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
            <sniem...@apache.org <mailto:sniem...@apache.org>>
            > > wrote:
            > > > >
            > > > >> Coincidentally, someone else in the ASF slack
            mentioned [1] yesterday
            > > > >> that they were seeing significantly reduced
            performance using
            > > KafkaIO.Read
            > > > >> w/ the SDF wrapper vs the unbounded source. 
            They mentioned they were
            > > using
            > > > >> flink 1.9.
            > > > >>
            > > > >>
            https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
            > > > >>
            > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
            <boyu...@google.com <mailto:boyu...@google.com>>
            > > wrote:
            > > > >>
            > > > >>> Hi Steve,
            > > > >>>
            > > > >>> I think the major performance regression comes from
            > > > >>>
            OutputAndTimeBoundedSplittableProcessElementInvoker[1],
            which will
            > > > >>> checkpoint the DoFn based on time/output limit
            and use timers/state
            > > to
            > > > >>> reschedule works.
            > > > >>>
            > > > >>> [1]
            > > > >>>
            > >
            
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
            > > > >>>
            > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz
            <sniem...@apache.org <mailto:sniem...@apache.org>>
            > > > >>> wrote:
            > > > >>>
            > > > >>>> I have a pipeline that reads from pubsub, does
            some aggregation, and
            > > > >>>> writes to various places.  Previously, in
            older versions of beam,
            > > when
            > > > >>>> running this in the DirectRunner, messages
            would go through the
            > > pipeline
            > > > >>>> almost instantly, making it very easy to debug
            locally, etc.
            > > > >>>>
            > > > >>>> However, after upgrading to beam 2.25, I
            noticed that it could take
            > > on
            > > > >>>> the order of 5-10 minutes for messages to get
            from the pubsub read
            > > step to
            > > > >>>> the next step in the pipeline (deserializing
            them, etc).  The
            > > subscription
            > > > >>>> being read from has on the order of 100,000
            elements/sec arriving
            > > in it.
            > > > >>>>
            > > > >>>> Setting --experiments=use_deprecated_read
            fixes it, and makes the
            > > > >>>> pipeline behave as it did before.
            > > > >>>>
            > > > >>>> It seems like the SDF implementation in the
            DirectRunner here is
            > > > >>>> causing some kind of issue, either buffering a
            very large amount of
            > > data
            > > > >>>> before emitting it in a bundle, or something
            else.  Has anyone else
            > > run
            > > > >>>> into this?
            > > > >>>>
            > > > >>>
            > > >
            > >
            >

Reply via email to