Hi Jan,

I'm not saying that it's completely impossible to do so but I want to
explain why it's hard to apply these changes to existing SDF wrapper.

In the current SDF UnboundedSource wrapper[1], the restriction is the
<UnboundedSource, CheckpointMark>. The reader is binded to the
UnboundedSourceAsSDFRestrictionTracker[2]. The reader is created from
CheckpointMark and is started when it's the first time to call
tracker.tryClaim(). The reader is closed when trySplit() happens
successfully. The DoFn only has access to the RestrictionTracker in
the @ProcessElement function, which means the SDF UnboundedSource wrapper
DoFn is not able to manage the reader directly though it's lifecycle. In
terms of the lifecycle of one RestrictionTracker instance, it is managed by
the invoker(or in portable execution, it's managed by the FnApiDoFnRunner).
The RestrictionTracker is created before @ProcessElement function
is invoked by calling @NewTracker, and it will be deconstructed when the
process function finishes.

It also makes sense to have CheckpointMark as the restriction because we
want to perform checkpoint on UnboundedSource.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L436
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L750

On Thu, Dec 17, 2020 at 2:42 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Boyuan,
>
> > Several changes could be made into PubSub SDF implementation specially.
> For example, the PuSub SDF can choose not respond to the checkpoint request
> when it thinks it's not a good time to do so. Besides, if the expensive
> connection can be binded to the lifecycle of the SDF instance instead of
> per restriction, the PubSub SDF implementation can choose to start the
> connection when the DoFn is started and close the connection when tearDown
> is called.
>
> Why the same cannot be applied to the general case? If we think about the
> "connection" and the "reader" as two abstract objects, it has the same
> methods - namely open and close, which is what defines the scope of the
> object. I think still think it should be possible to implement that
> generally.
>
> Jan
> On 12/17/20 11:19 PM, Boyuan Zhang wrote:
>
> Hi Jan, thanks for the quick followup!
>
> I'm not sure if I see the difference between writing a "native" SDF for
>> PubSub source and the UnboundedSource wrapper. With regards to the relation
>> between reader and checkpoint, wouldn't the native implementation be at the
>> same position?
>
> Several changes could be made into PubSub SDF implementation specially.
> For example, the PuSub SDF can choose not respond to the checkpoint request
> when it thinks it's not a good time to do so. Besides, if the expensive
> connection can be binded to the lifecycle of the SDF instance instead of
> per restriction, the PubSub SDF implementation can choose to start the
> connection when the DoFn is started and close the connection when tearDown
> is called.
>
> We might not be able to do so on SDF wrapper since it's a kind of general
> solution for all sources and not all sources don't have the connection
> binded to the restriction.
>
> Another workaround for using PubSub on DirectRunner might be using
> --experiments=enable_custom_pubsub_source, This flag will make the pipeline
> to use a DoFn to read from PubSub instead of using UnboundedSource. Hope it
> will be helpful as well.
>
>
> On Thu, Dec 17, 2020 at 1:09 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Boyuan,
>>
>> I'm not sure if I see the difference between writing a "native" SDF for
>> PubSub source and the UnboundedSource wrapper. With regards to the relation
>> between reader and checkpoint, wouldn't the native implementation be at the
>> same position?
>>
>> In my point of view, the decision to close the reader is simply a matter
>> of lifecycle of the reader. Currently, it is tightly bound to the
>> restriction being processed, but that could be relaxed, so that instead of
>> immediately closing the reader, it could be only _scheduled for closing in
>> future_ (using processing time timer for instance) provided it is not
>> reused in the remaining restriction after split (by the same instance of
>> DoFn). That is an optimization that could really make sense outside
>> DirectRunner, because for instance Flink has use cases, where user really
>> *wants* to configure quite often checkpoints (has relation to how Flink
>> implements @RequiresStableInput).
>>
>> Jan
>> On 12/17/20 9:04 PM, Boyuan Zhang wrote:
>>
>> Sorry for the confusion.
>>
>>
>>>  Are you saying it *is* necessary to close the reader on checkpoint, so
>>> the only solution is to reduce checkpoint frequency?
>>
>> In the PubSub on DirectRunner with SDF wrapper case, my answer is yes
>> based on my understanding.
>> Closing the reader during checkpoint is the implementation details of how
>> the SDF wrapper wraps the Unbounded/Bounded source. It's not controlled by
>> the DirectRunner and the only thing DirectRunner can control is the
>> frequency of checkpoint, which is hardcoded now. And closing the reader is
>> the right behavior since the work could be distributed to another instance
>> in the real world.
>>
>> The ideal solution would be to offer a way to make the frequency
>> configurable, most possibly via PipelineOptions. Or we turn the current
>> PubSub UnboundedSource(and other source) implementation into SDF. IIUC, the
>> SDF wrapper is a migration phase of Unbounded/Bounded source to SDF.
>> Eventually we should have every source in SDF.
>>
>> On Thu, Dec 17, 2020 at 11:49 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Boyuan your suggestion seems at odds with Jan's. Are you saying it *is*
>>> necessary to close the reader on checkpoint, so the only solution is to
>>> reduce checkpoint frequency?
>>>
>>> On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang <boyu...@google.com>
>>> wrote:
>>>
>>>> Thanks for your investigation, Steve! It seems like preventing the
>>>> checkpoint from happening so frequently would be one workaround for you.
>>>> Making the checkpoint frequency configurable from pipeline option seems
>>>> like the way to go.
>>>>
>>>> On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I didn't mean we should deliberately make DirectRunner slow, or we
>>>>> should not fix performance issues, if can be fixed. What I meant was that
>>>>> if we are to choose between short checkpoint time (and few elements
>>>>> processed before checkpoint is taken) or performance, we should prefer
>>>>> better tested code, in this particular case.
>>>>>
>>>>> > After a bunch of debugging, I think I finally figured out what the
>>>>> problem is though.  During a checkpoint (in trySplit), the
>>>>> UnboundedSourceViaSDF wrapper will close the current source reader and
>>>>> create a new one.
>>>>>
>>>>> That is actually a great example. The problem should be fixed there
>>>>> (the reader probably need not to be closed on checkpoint). And it is
>>>>> DirectRunner that manifested this, due to short checkpointing.
>>>>>
>>>>> Jan
>>>>> On 12/17/20 4:14 PM, Steve Niemitz wrote:
>>>>>
>>>>> > Primary purpose of DirectRunner is testing, not performance
>>>>>
>>>>> That's one argument, but it's very difficult to effectively test a
>>>>> pipeline when I need to wait 15+ minutes for the first element to go
>>>>> through it.  I also, disagree in general that we shouldn't care about the
>>>>> performance of the DirectRunner.  It's likely the first runner new users 
>>>>> of
>>>>> beam try (I know it was for us), and if it doesn't provide enough
>>>>> performance to actually run a representative pipeline, users may
>>>>> extrapolate that performance onto other runners (I know we did).
>>>>> Anecdotally, the fact that the DirectRunner didn't work for some of our
>>>>> initial test pipelines (because of performance problems) probably delayed
>>>>> our adoption of beam by at least 6 months.
>>>>>
>>>>> > Steve, based on your findings, it seems like it takes more time for
>>>>> the SDF pipeline to actually start to read from PubSub and more time to
>>>>> output records.
>>>>>
>>>>> Pubsub reads start ~instantly. but I'm not able to see any elements
>>>>> actually output from it for a LONG time, sometimes 30+ minutes.  I see the
>>>>> reader acking back to pubsub, so it IS committing, but no elements output.
>>>>>
>>>>> After a bunch of debugging, I think I finally figured out what the
>>>>> problem is though.  During a checkpoint (in trySplit), the
>>>>> UnboundedSourceViaSDF wrapper will close the current source reader and
>>>>> create a new one.  The problem is, the pubsub reader needs some time to
>>>>> correctly estimate it's watermark [1], and because it gets closed and
>>>>> recreated so frequently due to checkpointing (either number of elements, 
>>>>> or
>>>>> duration), it can never actually provide accurate estimates, and always
>>>>> returns the min watermark.  This seems like it prevents some internal
>>>>> timers from ever firing, effectively holding all elements in the pipeline
>>>>> state.  I can confirm this also by looking at WatermarkManager, where I 
>>>>> see
>>>>> all the bundles pending.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959
>>>>>
>>>>> On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Ismaël,
>>>>>>
>>>>>> what I meant by the performance vs. testing argument is that when
>>>>>> choosing default values for certain (possibly configurable) options,
>>>>>> we
>>>>>> should prefer choices that result in better tested code, not better
>>>>>> performance. DirectRunner actually does quite many things that are
>>>>>> suboptimal performance-wise, but are good to be done for test
>>>>>> purposes
>>>>>> (immutability checks, as an example).
>>>>>>
>>>>>> Regarding SDF in general, I can confirm we see some issues with
>>>>>> Flink,
>>>>>> most recently [1] (which I'm trying to fix right now). That is
>>>>>> actually
>>>>>> correctness, not performance issue. I personally didn't notice any
>>>>>> performance issues, so far.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-11481
>>>>>>
>>>>>> On 12/17/20 3:24 PM, Ismaël Mejía wrote:
>>>>>> > The influence of checkpointing on the output of the results should
>>>>>> be
>>>>>> > minimal in particular for Direct Runner. It seems what Steve reports
>>>>>> > here seems to be something different. Jan have you or others already
>>>>>> > checked the influence of this on Flink who is now using this new
>>>>>> > translation path?
>>>>>> >
>>>>>> > I think the argument that the Direct runner is mostly about testing
>>>>>> > and not about performance is an argument that is playing bad on
>>>>>> Beam,
>>>>>> > one should not necessarily exclude the other. Direct runner is our
>>>>>> > most used runner, basically every Beam user relies on the direct
>>>>>> > runners so every regression or improvement on it affects everyone,
>>>>>> but
>>>>>> > well that's a subject worth its own thread.
>>>>>> >
>>>>>> > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>> >> Hi,
>>>>>> >>
>>>>>> >> from my point of view the number in DirectRunner are set
>>>>>> correctly. Primary purpose of DirectRunner is testing, not performance, 
>>>>>> so
>>>>>> DirectRunner makes intentionally frequent checkpoints to easily exercise
>>>>>> potential bugs in user code. It might be possible to make the frequency
>>>>>> configurable, though.
>>>>>> >>
>>>>>> >> Jan
>>>>>> >>
>>>>>> >> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>>>>>> >>
>>>>>> >> It's not a portable execution on DirectRunner so I would expect
>>>>>> that outputs from OutputAndTimeBoundedSplittableProcessElementInvoker
>>>>>> should be emitted immediately. For SDF execution on DirectRunner, the
>>>>>> overhead could come from the SDF expansion, SDF wrapper and the invoker.
>>>>>> >>
>>>>>> >> Steve, based on your findings, it seems like it takes more time
>>>>>> for the SDF pipeline to actually start to read from PubSub and more time 
>>>>>> to
>>>>>> output records. Are you able to tell how much time each part is taking?
>>>>>> >>
>>>>>> >> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <
>>>>>> rober...@google.com> wrote:
>>>>>> >>> If all it takes is bumping these numbers up a bit, that seems
>>>>>> like a reasonable thing to do ASAP. (I would argue that perhaps they
>>>>>> shouldn't be static, e.g. it might be preferable to start emitting 
>>>>>> results
>>>>>> right away, but use larger batches for the steady state if there are
>>>>>> performance benefits.)
>>>>>> >>>
>>>>>> >>> That being said, it sounds like there's something deeper going on
>>>>>> here. We should also verify that this performance impact is limited to 
>>>>>> the
>>>>>> direct runner.
>>>>>> >>>
>>>>>> >>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <
>>>>>> sniem...@apache.org> wrote:
>>>>>> >>>> I tried changing my build locally to 10 seconds and 10,000
>>>>>> elements but it didn't seem to make much of a difference, it still takes 
>>>>>> a
>>>>>> few minutes for elements to begin actually showing up to downstream 
>>>>>> stages
>>>>>> from the Pubsub read.  I can see elements being emitted from
>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being
>>>>>> committed by ParDoEvaluator.finishBundle, but after that, they seem to 
>>>>>> just
>>>>>> kind of disappear somewhere.
>>>>>> >>>>
>>>>>> >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <boyu...@google.com>
>>>>>> wrote:
>>>>>> >>>>> Making it as the PipelineOptions was my another proposal but it
>>>>>> might take some time to do so. On the other hand, tuning the number into
>>>>>> something acceptable is low-hanging fruit.
>>>>>> >>>>>
>>>>>> >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <
>>>>>> ieme...@gmail.com> wrote:
>>>>>> >>>>>> It sounds reasonable. I am wondering also on the consequence
>>>>>> of these
>>>>>> >>>>>> parameters for other runners (where it is every 10 seconds or
>>>>>> 10000
>>>>>> >>>>>> elements) + their own configuration e.g. checkpointInterval,
>>>>>> >>>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for
>>>>>> Flink. It
>>>>>> >>>>>> is not clear for me what would be chosen now in this case.
>>>>>> >>>>>>
>>>>>> >>>>>> I know we are a bit anti knobs but maybe it makes sense to
>>>>>> make this
>>>>>> >>>>>> configurable via PipelineOptions at least for Direct runner.
>>>>>> >>>>>>
>>>>>> >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <
>>>>>> boyu...@google.com> wrote:
>>>>>> >>>>>>> I agree, Ismael.
>>>>>> >>>>>>>
>>>>>> >>>>>>>  From my current investigation, the performance overhead
>>>>>> should majorly come from the frequency of checkpoint in
>>>>>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is 
>>>>>> hardcoded
>>>>>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
>>>>>> configuring these numbers on DirectRunner should improve reported cases 
>>>>>> so
>>>>>> far. My last proposal was to change the number to every 5 seconds or 
>>>>>> 10000
>>>>>> elements. What do you think?
>>>>>> >>>>>>>
>>>>>> >>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>> >>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>>>> >>>>>>>
>>>>>> >>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <
>>>>>> ieme...@gmail.com> wrote:
>>>>>> >>>>>>>> I can guess that the same issues mentioned here probably
>>>>>> will affect
>>>>>> >>>>>>>> the usability for people trying Beam's interactive SQL on
>>>>>> Unbounded IO
>>>>>> >>>>>>>> too.
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> We should really take into account that the performance of
>>>>>> the SDF
>>>>>> >>>>>>>> based path should be as good or better than the previous
>>>>>> version
>>>>>> >>>>>>>> before considering its removal
>>>>>> (--experiments=use_deprecated_read) and
>>>>>> >>>>>>>> probably have consensus when this happens.
>>>>>> >>>>>>>>
>>>>>> >>>>>>>>
>>>>>> >>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <
>>>>>> boyu...@google.com> wrote:
>>>>>> >>>>>>>>>>  From what I've seen, the direct runner initiates a
>>>>>> checkpoint after every element output.
>>>>>> >>>>>>>>> That seems like the 1 second limit kicks in before the
>>>>>> output reaches 100 elements.
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> I think the original purpose for DirectRunner to use a
>>>>>> small limit on issuing checkpoint requests is for exercising SDF better 
>>>>>> in
>>>>>> a small data set. But it brings overhead on a larger set owing to too 
>>>>>> many
>>>>>> checkpoints. It would be ideal to make this limit configurable from
>>>>>> pipeline but the easiest approach is that we figure out a number for most
>>>>>> common cases. Do you think we raise the limit to 1000 elements or every 5
>>>>>> seconds will help?
>>>>>> >>>>>>>>>
>>>>>> >>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <
>>>>>> sniem...@apache.org> wrote:
>>>>>> >>>>>>>>>>  From what I've seen, the direct runner initiates a
>>>>>> checkpoint after every element output.
>>>>>> >>>>>>>>>>
>>>>>> >>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <
>>>>>> boyu...@google.com> wrote:
>>>>>> >>>>>>>>>>> Hi Antonio,
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> Thanks for the details! Which version of Beam SDK are you
>>>>>> using? And are you using --experiments=beam_fn_api with DirectRunner to
>>>>>> launch your pipeline?
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a
>>>>>> Kafka topic+partition as input element and a KafkaConsumer will be 
>>>>>> assigned
>>>>>> to this topic+partition then poll records continuously. The Kafka 
>>>>>> consumer
>>>>>> will resume reading and return from the process fn when
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> There are no available records currently(this is a
>>>>>> feature of SDF which calls SDF self-initiated checkpoint)
>>>>>> >>>>>>>>>>> The OutputAndTimeBoundedSplittableProcessElementInvoker
>>>>>> issues checkpoint request to ReadFromKafkaDoFn for getting partial 
>>>>>> results.
>>>>>> The checkpoint frequency for DirectRunner is every 100 output records or
>>>>>> every 1 seconds.
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> It seems like either the self-initiated checkpoint or
>>>>>> DirectRunner issued checkpoint gives you the performance regression since
>>>>>> there is overhead when rescheduling residuals. In your case, it's more 
>>>>>> like
>>>>>> that the checkpoint behavior of
>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200 
>>>>>> elements
>>>>>> a batch. I want to understand what kind of performance regression you are
>>>>>> noticing? Is it slower to output the same amount of records?
>>>>>> >>>>>>>>>>>
>>>>>> >>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <
>>>>>> antonio...@gmail.com> wrote:
>>>>>> >>>>>>>>>>>> Hi Boyuan,
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> This is Antonio. I reported the KafkaIO.read()
>>>>>> performance issue on the slack channel a few days ago.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> I am not sure if this is helpful, but I have been doing
>>>>>> some debugging on the SDK KafkaIO performance issue for our pipeline and 
>>>>>> I
>>>>>> would like to provide some observations.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> It looks like in my case the
>>>>>> ReadFromKafkaDoFn.processElement()  was invoked within the same thread 
>>>>>> and
>>>>>> every time kafaconsumer.poll() is called, it returns some records, from 1
>>>>>> up to 200 records. So, it will proceed to run the pipeline steps. Each
>>>>>> kafkaconsumer.poll() takes about 0.8ms. So, in this case, the polling and
>>>>>> running of the pipeline are executed sequentially within a single thread.
>>>>>> So, after processing a batch of records, it will need to wait for 0.8ms
>>>>>> before it can process the next batch of records again.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Any suggestions would be appreciated.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Hope that helps.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Thanks and regards,
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> Antonio.
>>>>>> >>>>>>>>>>>>
>>>>>> >>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com>
>>>>>> wrote:
>>>>>> >>>>>>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403
>>>>>> for tracking.
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <
>>>>>> boyu...@google.com> wrote:
>>>>>> >>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The
>>>>>> execution paths for
>>>>>> >>>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's
>>>>>> highly possible that
>>>>>> >>>>>>>>>>>>>> the regression either comes from the invocation path
>>>>>> for SDF wrapper, or
>>>>>> >>>>>>>>>>>>>> the implementation of SDF wrapper itself.
>>>>>> >>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>>>>>> sniem...@apache.org> wrote:
>>>>>> >>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack
>>>>>> mentioned [1] yesterday
>>>>>> >>>>>>>>>>>>>>> that they were seeing significantly reduced
>>>>>> performance using KafkaIO.Read
>>>>>> >>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.  They
>>>>>> mentioned they were using
>>>>>> >>>>>>>>>>>>>>> flink 1.9.
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>
>>>>>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <
>>>>>> boyu...@google.com> wrote:
>>>>>> >>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>> Hi Steve,
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>> I think the major performance regression comes from
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>>>>>> >>>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit and
>>>>>> use timers/state to
>>>>>> >>>>>>>>>>>>>>>> reschedule works.
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>> [1]
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>>>>>> sniem...@apache.org>
>>>>>> >>>>>>>>>>>>>>>> wrote:
>>>>>> >>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some
>>>>>> aggregation, and
>>>>>> >>>>>>>>>>>>>>>>> writes to various places.  Previously, in older
>>>>>> versions of beam, when
>>>>>> >>>>>>>>>>>>>>>>> running this in the DirectRunner, messages would go
>>>>>> through the pipeline
>>>>>> >>>>>>>>>>>>>>>>> almost instantly, making it very easy to debug
>>>>>> locally, etc.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed
>>>>>> that it could take on
>>>>>> >>>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get from
>>>>>> the pubsub read step to
>>>>>> >>>>>>>>>>>>>>>>> the next step in the pipeline (deserializing them,
>>>>>> etc).  The subscription
>>>>>> >>>>>>>>>>>>>>>>> being read from has on the order of 100,000
>>>>>> elements/sec arriving in it.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it,
>>>>>> and makes the
>>>>>> >>>>>>>>>>>>>>>>> pipeline behave as it did before.
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>> >>>>>>>>>>>>>>>>> It seems like the SDF implementation in the
>>>>>> DirectRunner here is
>>>>>> >>>>>>>>>>>>>>>>> causing some kind of issue, either buffering a very
>>>>>> large amount of data
>>>>>> >>>>>>>>>>>>>>>>> before emitting it in a bundle, or something else.
>>>>>> Has anyone else run
>>>>>> >>>>>>>>>>>>>>>>> into this?
>>>>>> >>>>>>>>>>>>>>>>>
>>>>>>
>>>>>

Reply via email to