> From what I've seen, the direct runner initiates a checkpoint after every
element output.
That seems like the 1 second limit kicks in before the output reaches 100
elements.

I think the original purpose for DirectRunner to use a small limit on
issuing checkpoint requests is for exercising SDF better in a small data
set. But it brings overhead on a larger set owing to too many checkpoints.
It would be ideal to make this limit configurable from pipeline but the
easiest approach is that we figure out a number for most common cases. Do
you think we raise the limit to 1000 elements or every 5 seconds will help?

On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> wrote:

> From what I've seen, the direct runner initiates a checkpoint after every
> element output.
>
> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> wrote:
>
>> Hi Antonio,
>>
>> Thanks for the details! Which version of Beam SDK are you using? And are
>> you using --experiments=beam_fn_api with DirectRunner to launch your
>> pipeline?
>>
>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>> topic+partition as input element and a KafkaConsumer will be assigned to
>> this topic+partition then poll records continuously. The Kafka consumer
>> will resume reading and return from the process fn when
>>
>>    - There are no available records currently(this is a feature of SDF
>>    which calls SDF self-initiated checkpoint)
>>    - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>>    checkpoint request to ReadFromKafkaDoFn for getting partial results. The
>>    checkpoint frequency for DirectRunner is every 100 output records or every
>>    1 seconds.
>>
>> It seems like either the self-initiated checkpoint or DirectRunner issued
>> checkpoint gives you the performance regression since there is overhead
>> when rescheduling residuals. In your case, it's more like that the
>> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
>> gives you 200 elements a batch. I want to understand what kind of
>> performance regression you are noticing? Is it slower to output the same
>> amount of records?
>>
>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote:
>>
>>> Hi Boyuan,
>>>
>>> This is Antonio. I reported the KafkaIO.read() performance issue on the
>>> slack channel a few days ago.
>>>
>>> I am not sure if this is helpful, but I have been doing some debugging
>>> on the SDK KafkaIO performance issue for our pipeline and I would like to
>>> provide some observations.
>>>
>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
>>> invoked within the same thread and every time kafaconsumer.poll() is
>>> called, it returns some records, from 1 up to 200 records. So, it will
>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
>>> 0.8ms. So, in this case, the polling and running of the pipeline are
>>> executed sequentially within a single thread. So, after processing a batch
>>> of records, it will need to wait for 0.8ms before it can process the next
>>> batch of records again.
>>>
>>> Any suggestions would be appreciated.
>>>
>>> Hope that helps.
>>>
>>> Thanks and regards,
>>>
>>> Antonio.
>>>
>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
>>> >
>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com>
>>> wrote:
>>> >
>>> > > Thanks for the pointer, Steve! I'll check it out. The execution
>>> paths for
>>> > > UnboundedSource and SDF wrapper are different. It's highly possible
>>> that
>>> > > the regression either comes from the invocation path for SDF
>>> wrapper, or
>>> > > the implementation of SDF wrapper itself.
>>> > >
>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org>
>>> wrote:
>>> > >
>>> > >> Coincidentally, someone else in the ASF slack mentioned [1]
>>> yesterday
>>> > >> that they were seeing significantly reduced performance using
>>> KafkaIO.Read
>>> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they
>>> were using
>>> > >> flink 1.9.
>>> > >>
>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>> > >>
>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com>
>>> wrote:
>>> > >>
>>> > >>> Hi Steve,
>>> > >>>
>>> > >>> I think the major performance regression comes from
>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>>> > >>> checkpoint the DoFn based on time/output limit and use
>>> timers/state to
>>> > >>> reschedule works.
>>> > >>>
>>> > >>> [1]
>>> > >>>
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>> > >>>
>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org>
>>> > >>> wrote:
>>> > >>>
>>> > >>>> I have a pipeline that reads from pubsub, does some aggregation,
>>> and
>>> > >>>> writes to various places.  Previously, in older versions of beam,
>>> when
>>> > >>>> running this in the DirectRunner, messages would go through the
>>> pipeline
>>> > >>>> almost instantly, making it very easy to debug locally, etc.
>>> > >>>>
>>> > >>>> However, after upgrading to beam 2.25, I noticed that it could
>>> take on
>>> > >>>> the order of 5-10 minutes for messages to get from the pubsub
>>> read step to
>>> > >>>> the next step in the pipeline (deserializing them, etc).  The
>>> subscription
>>> > >>>> being read from has on the order of 100,000 elements/sec arriving
>>> in it.
>>> > >>>>
>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the
>>> > >>>> pipeline behave as it did before.
>>> > >>>>
>>> > >>>> It seems like the SDF implementation in the DirectRunner here is
>>> > >>>> causing some kind of issue, either buffering a very large amount
>>> of data
>>> > >>>> before emitting it in a bundle, or something else.  Has anyone
>>> else run
>>> > >>>> into this?
>>> > >>>>
>>> > >>>
>>> >
>>>
>>

Reply via email to