I understand, that this could be difficult in the current implementation, my intent was just to point out that this should be possible, even in the general case. From the top of my head (and I didn't walk this though entirely, so please don't take me too literaly), it seems that what we would want is to couple the lifecycle of the Reader not with the restriction but with the particular instance of (Un)boundedSource (after being split). That could be done in the processing DoFn, if it contained a cache mapping instance of the source to the (possibly null - i.e. not yet open) reader. In @NewTracker we could assign (or create) the reader to the tracker, as the tracker is created for each restriction.



I'm not saying that it's completely impossible to do so but I want to explain why it's hard to apply these changes to existing SDF wrapper.

In the current SDF UnboundedSource wrapper[1], the restriction is the <UnboundedSource, CheckpointMark>. The reader is binded to the UnboundedSourceAsSDFRestrictionTracker[2]. The reader is created from CheckpointMark and is started when it's the first time to call tracker.tryClaim(). The reader is closed when trySplit() happens successfully. The DoFn only has access to the RestrictionTracker in the @ProcessElement function, which means the SDF UnboundedSource wrapper DoFn is not able to manage the reader directly though it's lifecycle. In terms of the lifecycle of one RestrictionTracker instance, it is managed by the invoker(or in portable execution, it's managed by the FnApiDoFnRunner). The RestrictionTracker is created before @ProcessElement function is invoked by calling @NewTracker, and it will be deconstructed when the process function finishes.

It also makes sense to have CheckpointMark as the restriction because we want to perform checkpoint on UnboundedSource.

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L436 [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L750

        I'm not sure if I see the difference between writing a
        "native" SDF for PubSub source and the UnboundedSource
        wrapper. With regards to the relation between reader and
        checkpoint, wouldn't the native implementation be at the same

    Several changes could be made into PubSub SDF implementation
    specially. For example, the PuSub SDF can choose not respond to
    the checkpoint request when it thinks it's not a good time to do
    so. Besides, if the expensive connection can be binded to the
    lifecycle of the SDF instance instead of per restriction, the
    PubSub SDF implementation can choose to start the connection when
    the DoFn is started and close the connection when tearDown is called.

    We might not be able to do so on SDF wrapper since it's a kind of
    general solution for all sources and not all sources don't have
    the connection binded to the restriction.

    Another workaround for using PubSub on DirectRunner might be
    using --experiments=enable_custom_pubsub_source, This flag will
    make the pipeline to use a DoFn to read from PubSub instead of
    using UnboundedSource. Hope it will be helpful as well.

             Are you saying it *is* necessary to close the reader on
            checkpoint, so the only solution is to reduce checkpoint
        In the PubSub on DirectRunner with SDF wrapper case, my
        answer is yes based on my understanding.
        Closing the reader during checkpoint is the implementation
        details of how the SDF wrapper wraps the Unbounded/Bounded
        source. It's not controlled by the DirectRunner and the only
        thing DirectRunner can control is the frequency of
        checkpoint, which is hardcoded now. And closing the reader
        is the right behavior since the work could be distributed to
        another instance in the real world.

        The ideal solution would be to offer a way to make the
        frequency configurable, most possibly via PipelineOptions.
        Or we turn the current PubSub UnboundedSource(and other
        source) implementation into SDF. IIUC, the SDF wrapper is a
        migration phase of Unbounded/Bounded source to SDF.
        Eventually we should have every source in SDF.

            Boyuan your suggestion seems at odds with Jan's. Are you
            saying it *is* necessary to close the reader on
            checkpoint, so the only solution is to reduce checkpoint

                Thanks for your investigation, Steve! It seems like
                preventing the checkpoint from happening so
                frequently would be one workaround for you. Making
                the checkpoint frequency configurable from pipeline
                option seems like the way to go.

                    I didn't mean we should deliberately make
                    DirectRunner slow, or we should not fix
                    performance issues, if can be fixed. What I
                    meant was that if we are to choose between short
                    checkpoint time (and few elements processed
                    before checkpoint is taken) or performance, we
                    should prefer better tested code, in this
                    particular case.

                    > After a bunch of debugging, I think I finally
                    figured out what the problem is though.  During
                    a checkpoint (in trySplit), the
                    UnboundedSourceViaSDF wrapper will close the
                    current source reader and create a new one.

                    That is actually a great example. The problem
                    should be fixed there (the reader probably need
                    not to be closed on checkpoint). And it is
                    DirectRunner that manifested this, due to short


                    > Primary purpose of DirectRunner is testing,
                    not performance

                    That's one argument, but it's very difficult to
                    effectively test a pipeline when I need to wait
                    15+ minutes for the first element to go through
                    it.  I also, disagree in general that we
                    shouldn't care about the performance of the
                    DirectRunner.  It's likely the first runner new
                    users of beam try (I know it was for us), and
                    if it doesn't provide enough performance to
                    actually run a representative pipeline, users
                    may extrapolate that performance onto other
                    runners (I know we did). Anecdotally, the fact
                    that the DirectRunner didn't work for some of
                    our initial test pipelines (because of
                    performance problems) probably delayed our
                    adoption of beam by at least 6 months.

                    > Steve, based on your findings, it seems like
                    it takes more time for the SDF pipeline to
                    actually start to read from PubSub and more
                    time to output records.

                    Pubsub reads start ~instantly. but I'm not able
                    to see any elements actually output from it for
                    a LONG time, sometimes 30+ minutes.  I see the
                    reader acking back to pubsub, so it IS
                    committing, but no elements output.

                    After a bunch of debugging, I think I finally
                    figured out what the problem is though.  During
                    a checkpoint (in trySplit), the
                    UnboundedSourceViaSDF wrapper will close the
                    current source reader and create a new one. 
                    The problem is, the pubsub reader needs some
                    time to correctly estimate it's watermark [1],
                    and because it gets closed and recreated so
                    frequently due to checkpointing (either number
                    of elements, or duration), it can never
                    actually provide accurate estimates, and always
                    returns the min watermark.  This seems like it
                    prevents some internal timers from ever firing,
                    effectively holding all elements in the
                    pipeline state.  I can confirm this also by
                    looking at WatermarkManager, where I see all
                    the bundles pending.


                        what I meant by the performance vs. testing
                        argument is that when
                        choosing default values for certain
                        (possibly configurable) options, we
                        should prefer choices that result in better
                        tested code, not better
                        performance. DirectRunner actually does
                        quite many things that are
                        suboptimal performance-wise, but are good
                        to be done for test purposes
                        (immutability checks, as an example).

                        Regarding SDF in general, I can confirm we
                        see some issues with Flink,
                        most recently [1] (which I'm trying to fix
                        right now). That is actually
                        correctness, not performance issue. I
                        personally didn't notice any
                        performance issues, so far.



                        > The influence of checkpointing on the
                        output of the results should be
                        > minimal in particular for Direct Runner.
                        It seems what Steve reports
                        > here seems to be something different. Jan
                        have you or others already
                        > checked the influence of this on Flink
                        who is now using this new
                        > translation path?
                        > I think the argument that the Direct
                        runner is mostly about testing
                        > and not about performance is an argument
                        that is playing bad on Beam,
                        > one should not necessarily exclude the
                        other. Direct runner is our
                        > most used runner, basically every Beam
                        user relies on the direct
                        > runners so every regression or
                        improvement on it affects everyone, but
                        > well that's a subject worth its own thread.
                        >> from my point of view the number in
                        DirectRunner are set correctly. Primary
                        purpose of DirectRunner is testing, not
                        performance, so DirectRunner makes
                        intentionally frequent checkpoints to
                        easily exercise potential bugs in user
                        code. It might be possible to make the
                        frequency configurable, though.
                        >> Jan
                        >> It's not a portable execution on
                        DirectRunner so I would expect that outputs
                        should be emitted immediately. For SDF
                        execution on DirectRunner, the overhead
                        could come from the SDF expansion, SDF
                        wrapper and the invoker.
                        >> Steve, based on your findings, it seems
                        like it takes more time for the SDF
                        pipeline to actually start to read from
                        PubSub and more time to output records. Are
                        you able to tell how much time each part is
                        >>> If all it takes is bumping these
                        numbers up a bit, that seems like a
                        reasonable thing to do ASAP. (I would argue
                        that perhaps they shouldn't be static, e.g.
                        it might be preferable to start emitting
                        results right away, but use larger batches
                        for the steady state if there are
                        performance benefits.)
                        >>> That being said, it sounds like there's
                        something deeper going on here. We should
                        also verify that this performance impact is
                        limited to the direct runner.
                        >>>> I tried changing my build locally to
                        10 seconds and 10,000 elements but it
                        didn't seem to make much of a difference,
                        it still takes a few minutes for elements
                        to begin actually showing up to downstream
                        stages from the Pubsub read.  I can see
                        elements being emitted from
                        and bundles being committed by
                        ParDoEvaluator.finishBundle, but after
                        that, they seem to just kind of disappear
                        >>>>> Making it as the PipelineOptions was
                        my another proposal but it might take some
                        time to do so. On the other hand, tuning
                        the number into something acceptable is
                        low-hanging fruit.
                        >>>>>> It sounds reasonable. I am wondering
                        also on the consequence of these
                        >>>>>> parameters for other runners (where
                        it is every 10 seconds or 10000
                        >>>>>> elements) + their own configuration
                        e.g. checkpointInterval,
                        >>>>>> checkpointTimeoutMillis and
                        minPauseBetweenCheckpoints for Flink. It
                        >>>>>> is not clear for me what would be
                        chosen now in this case.
                        >>>>>> I know we are a bit anti knobs but
                        maybe it makes sense to make this
                        >>>>>> configurable via PipelineOptions at
                        least for Direct runner.
                        >>>>>>>  From my current investigation, the
                        performance overhead should majorly come
                        from the frequency of checkpoint in
                        which is hardcoded in the
                        DirectRunner(every 1 seconds or 100
                        elements)[2]. I believe configuring these
                        numbers on DirectRunner should improve
                        reported cases so far. My last proposal was
                        to change the number to every 5 seconds or
                        10000 elements. What do you think?
                        >>>>>>> [1]
                        >>>>>>> [2]
                        >>>>>>>> I can guess that the same issues
                        mentioned here probably will affect
                        >>>>>>>> the usability for people trying
                        Beam's interactive SQL on Unbounded IO
                        >>>>>>>> too.
                        >>>>>>>> We should really take into account
                        that the performance of the SDF
                        >>>>>>>> based path should be as good or
                        better than the previous version
                        >>>>>>>> before considering its removal
                        (--experiments=use_deprecated_read) and
                        >>>>>>>> probably have consensus when this
                        >>>>>>>>>>> Thanks for the details! Which
                        version of Beam SDK are you using? And are
                        you using --experiments=beam_fn_api with
                        DirectRunner to launch your pipeline?
                        >>>>>>>>>>> For
                        ReadFromKafkaDoFn.processElement(), it will
                        take a Kafka topic+partition as input
                        element and a KafkaConsumer will be
                        assigned to this topic+partition then poll
                        records continuously. The Kafka consumer
                        will resume reading and return from the
                        process fn when
                        >>>>>>>>>>> There are no available records
                        currently(this is a feature of SDF which
                        calls SDF self-initiated checkpoint)
                        >>>>>>>>>>> The
                        issues checkpoint request to
                        ReadFromKafkaDoFn for getting partial
                        results. The checkpoint frequency for
                        DirectRunner is every 100 output records or
                        every 1 seconds.
                        >>>>>>>>>>> It seems like either the
                        self-initiated checkpoint or DirectRunner
                        issued checkpoint gives you the performance
                        regression since there is overhead when
                        rescheduling residuals. In your case, it's
                        more like that the checkpoint behavior of
                        gives you 200 elements a batch. I want to
                        understand what kind of performance
                        regression you are noticing? Is it slower
                        to output the same amount of records?
                        >>>>>>>>>>>> This is Antonio. I reported
                        the KafkaIO.read() performance issue on the
                        slack channel a few days ago.
                        >>>>>>>>>>>> I am not sure if this is
                        helpful, but I have been doing some
                        debugging on the SDK KafkaIO performance
                        issue for our pipeline and I would like to
                        provide some observations.
                        >>>>>>>>>>>> It looks like in my case the
                        ReadFromKafkaDoFn.processElement() was
                        invoked within the same thread and every
                        time kafaconsumer.poll() is called, it
                        returns some records, from 1 up to 200
                        records. So, it will proceed to run the
                        pipeline steps. Each kafkaconsumer.poll()
                        takes about 0.8ms. So, in this case, the
                        polling and running of the pipeline are
                        executed sequentially within a single
                        thread. So, after processing a batch of
                        records, it will need to wait for 0.8ms
                        before it can process the next batch of
                        records again.
                        >>>>>>>>>>>> Any suggestions would be
                        >>>>>>>>>>>>> Opened
                        for tracking.
                        Steve! I'll check it out. The execution
                        paths for
                        >>>>>>>>>>>>>> UnboundedSource and SDF
                        wrapper are different. It's highly possible
                        >>>>>>>>>>>>>> the regression either comes
                        from the invocation path for SDF wrapper, or
                        >>>>>>>>>>>>>> the implementation of SDF
                        wrapper itself.
                        >>>>>>>>>>>>>>> Coincidentally, someone
                        else in the ASF slack mentioned [1] yesterday
                        >>>>>>>>>>>>>>> that they were seeing
                        significantly reduced performance using
                        >>>>>>>>>>>>>>> w/ the SDF wrapper vs the
                        unbounded source.  They mentioned they were
                        >>>>>>>>>>>>>>> flink 1.9.
                        >>>>>>>>>>>>>>>> I think the major
                        performance regression comes from
                        which will
                        >>>>>>>>>>>>>>>> checkpoint the DoFn based
                        on time/output limit and use timers/state to
                        >>>>>>>>>>>>>>>> reschedule works.
                        >>>>>>>>>>>>>>>> [1]
                        >>>>>>>>>>>>>>>>> I have a pipeline that
                        reads from pubsub, does some aggregation, and
                        >>>>>>>>>>>>>>>>> writes to various places.
                        Previously, in older versions of beam, when
                        >>>>>>>>>>>>>>>>> running this in the
                        DirectRunner, messages would go through the
                        >>>>>>>>>>>>>>>>> almost instantly, making
                        it very easy to debug locally, etc.
                        >>>>>>>>>>>>>>>>> However, after upgrading
                        to beam 2.25, I noticed that it could take on
                        >>>>>>>>>>>>>>>>> the order of 5-10 minutes
                        for messages to get from the pubsub read
                        step to
                        >>>>>>>>>>>>>>>>> the next step in the
                        pipeline (deserializing them, etc). The
                        >>>>>>>>>>>>>>>>> being read from has on
                        the order of 100,000 elements/sec arriving
                        in it.
                        >>>>>>>>>>>>>>>>> Setting
                        --experiments=use_deprecated_read fixes it,
                        and makes the
                        >>>>>>>>>>>>>>>>> pipeline behave as it did
                        >>>>>>>>>>>>>>>>> It seems like the SDF
                        implementation in the DirectRunner here is
                        >>>>>>>>>>>>>>>>> causing some kind of
                        issue, either buffering a very large amount
                        of data
                        >>>>>>>>>>>>>>>>> before emitting it in a
                        bundle, or something else. Has anyone else run
                        >>>>>>>>>>>>>>>>> into this?

