It sounds reasonable. I am wondering also on the consequence of these
parameters for other runners (where it is every 10 seconds or 10000
elements) + their own configuration e.g. checkpointInterval,
checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It
is not clear for me what would be chosen now in this case.

I know we are a bit anti knobs but maybe it makes sense to make this
configurable via PipelineOptions at least for Direct runner.

On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <boyu...@google.com> wrote:
>
> I agree, Ismael.
>
> From my current investigation, the performance overhead should majorly come 
> from the frequency of checkpoint in 
> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded in 
> the DirectRunner(every 1 seconds or 100 elements)[2]. I believe configuring 
> these numbers on DirectRunner should improve reported cases so far. My last 
> proposal was to change the number to every 5 seconds or 10000 elements. What 
> do you think?
>
> [1] 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> [2] 
> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>
> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>> I can guess that the same issues mentioned here probably will affect
>> the usability for people trying Beam's interactive SQL on Unbounded IO
>> too.
>>
>> We should really take into account that the performance of the SDF
>> based path should be as good or better than the previous version
>> before considering its removal (--experiments=use_deprecated_read) and
>> probably have consensus when this happens.
>>
>>
>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> wrote:
>> >
>> > > From what I've seen, the direct runner initiates a checkpoint after 
>> > > every element output.
>> > That seems like the 1 second limit kicks in before the output reaches 100 
>> > elements.
>> >
>> > I think the original purpose for DirectRunner to use a small limit on 
>> > issuing checkpoint requests is for exercising SDF better in a small data 
>> > set. But it brings overhead on a larger set owing to too many checkpoints. 
>> > It would be ideal to make this limit configurable from pipeline but the 
>> > easiest approach is that we figure out a number for most common cases. Do 
>> > you think we raise the limit to 1000 elements or every 5 seconds will help?
>> >
>> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> wrote:
>> >>
>> >> From what I've seen, the direct runner initiates a checkpoint after every 
>> >> element output.
>> >>
>> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> wrote:
>> >>>
>> >>> Hi Antonio,
>> >>>
>> >>> Thanks for the details! Which version of Beam SDK are you using? And are 
>> >>> you using --experiments=beam_fn_api with DirectRunner to launch your 
>> >>> pipeline?
>> >>>
>> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka 
>> >>> topic+partition as input element and a KafkaConsumer will be assigned to 
>> >>> this topic+partition then poll records continuously. The Kafka consumer 
>> >>> will resume reading and return from the process fn when
>> >>>
>> >>> There are no available records currently(this is a feature of SDF which 
>> >>> calls SDF self-initiated checkpoint)
>> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues 
>> >>> checkpoint request to ReadFromKafkaDoFn for getting partial results. The 
>> >>> checkpoint frequency for DirectRunner is every 100 output records or 
>> >>> every 1 seconds.
>> >>>
>> >>> It seems like either the self-initiated checkpoint or DirectRunner 
>> >>> issued checkpoint gives you the performance regression since there is 
>> >>> overhead when rescheduling residuals. In your case, it's more like that 
>> >>> the checkpoint behavior of 
>> >>> OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200 
>> >>> elements a batch. I want to understand what kind of performance 
>> >>> regression you are noticing? Is it slower to output the same amount of 
>> >>> records?
>> >>>
>> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote:
>> >>>>
>> >>>> Hi Boyuan,
>> >>>>
>> >>>> This is Antonio. I reported the KafkaIO.read() performance issue on the 
>> >>>> slack channel a few days ago.
>> >>>>
>> >>>> I am not sure if this is helpful, but I have been doing some debugging 
>> >>>> on the SDK KafkaIO performance issue for our pipeline and I would like 
>> >>>> to provide some observations.
>> >>>>
>> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was 
>> >>>> invoked within the same thread and every time kafaconsumer.poll() is 
>> >>>> called, it returns some records, from 1 up to 200 records. So, it will 
>> >>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes 
>> >>>> about 0.8ms. So, in this case, the polling and running of the pipeline 
>> >>>> are executed sequentially within a single thread. So, after processing 
>> >>>> a batch of records, it will need to wait for 0.8ms before it can 
>> >>>> process the next batch of records again.
>> >>>>
>> >>>> Any suggestions would be appreciated.
>> >>>>
>> >>>> Hope that helps.
>> >>>>
>> >>>> Thanks and regards,
>> >>>>
>> >>>> Antonio.
>> >>>>
>> >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
>> >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
>> >>>> >
>> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> 
>> >>>> > wrote:
>> >>>> >
>> >>>> > > Thanks for the pointer, Steve! I'll check it out. The execution 
>> >>>> > > paths for
>> >>>> > > UnboundedSource and SDF wrapper are different. It's highly possible 
>> >>>> > > that
>> >>>> > > the regression either comes from the invocation path for SDF 
>> >>>> > > wrapper, or
>> >>>> > > the implementation of SDF wrapper itself.
>> >>>> > >
>> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> 
>> >>>> > > wrote:
>> >>>> > >
>> >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1] 
>> >>>> > >> yesterday
>> >>>> > >> that they were seeing significantly reduced performance using 
>> >>>> > >> KafkaIO.Read
>> >>>> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they 
>> >>>> > >> were using
>> >>>> > >> flink 1.9.
>> >>>> > >>
>> >>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>> >>>> > >>
>> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> 
>> >>>> > >> wrote:
>> >>>> > >>
>> >>>> > >>> Hi Steve,
>> >>>> > >>>
>> >>>> > >>> I think the major performance regression comes from
>> >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>> >>>> > >>> checkpoint the DoFn based on time/output limit and use 
>> >>>> > >>> timers/state to
>> >>>> > >>> reschedule works.
>> >>>> > >>>
>> >>>> > >>> [1]
>> >>>> > >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> >>>> > >>>
>> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org>
>> >>>> > >>> wrote:
>> >>>> > >>>
>> >>>> > >>>> I have a pipeline that reads from pubsub, does some aggregation, 
>> >>>> > >>>> and
>> >>>> > >>>> writes to various places.  Previously, in older versions of 
>> >>>> > >>>> beam, when
>> >>>> > >>>> running this in the DirectRunner, messages would go through the 
>> >>>> > >>>> pipeline
>> >>>> > >>>> almost instantly, making it very easy to debug locally, etc.
>> >>>> > >>>>
>> >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it could 
>> >>>> > >>>> take on
>> >>>> > >>>> the order of 5-10 minutes for messages to get from the pubsub 
>> >>>> > >>>> read step to
>> >>>> > >>>> the next step in the pipeline (deserializing them, etc).  The 
>> >>>> > >>>> subscription
>> >>>> > >>>> being read from has on the order of 100,000 elements/sec 
>> >>>> > >>>> arriving in it.
>> >>>> > >>>>
>> >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the
>> >>>> > >>>> pipeline behave as it did before.
>> >>>> > >>>>
>> >>>> > >>>> It seems like the SDF implementation in the DirectRunner here is
>> >>>> > >>>> causing some kind of issue, either buffering a very large amount 
>> >>>> > >>>> of data
>> >>>> > >>>> before emitting it in a bundle, or something else.  Has anyone 
>> >>>> > >>>> else run
>> >>>> > >>>> into this?
>> >>>> > >>>>
>> >>>> > >>>
>> >>>> >

Reply via email to