Hi Antonio,

I'm getting one more question for your Kafka experiment on FlinkRunner. I'm
wondering what your checkpoint interval is for your flink application.

The reason why I ask is that IIUC, creating connections in Kafka should be
really cheap. So I would imagine the overhead here should be different from
the PubSub case. In Flink, the checkpoint frequency for SDF is configured
as 10000 elements or 10 seconds(note that the checkpoint here is not the
same concept of Flink checkpoint). With UnboundedSource implementation, the
frequency of source checkpoint depends on the flink checkpoint frequency.

On Mon, Dec 21, 2020 at 1:16 PM Jan Lukavský <je...@seznam.cz> wrote:

> Sure. My ID is je-ik.
>
> Thanks,
>
>  Jan
> On 12/21/20 8:43 PM, Boyuan Zhang wrote:
>
> Thanks for your explanation, Jan. Now I can see what you mean here. I can
> try to have a PR to do such optimization. Would you like to share your
> github ID with me to review the PR later?
>
> On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> If readers are expensive to create, this seems like an important (and not
>> too difficult) optimization.
>>
>> On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Boyuan,
>>>
>>> I think your analysis is correct - with one exception. It should  be
>>> possible to reuse the reader if and only if the last taken CheckpointMark
>>> equals to the new CheckpointMark the reader would be created from. But -
>>> this equality is on the happy path and should be satisfied for vast
>>> majority of invocations, so it will spare many call to createReader.
>>> Actually, it should be non-equal only after recovery from checkpoint, but
>>> then there should be no reader. So to be technically correct, we should
>>> keep the last CheckpointMark along with the open reader, but that might
>>> turn out to be non-necessary (I'm not sure about that and I would
>>> definitely keep the last CheckpointMark, because it is better safe than
>>> sorry :))
>>>
>>> Jan
>>> On 12/21/20 7:54 PM, Boyuan Zhang wrote:
>>>
>>> Hi Jan,
>>>>
>>>> it seems that what we would want is to couple the lifecycle of the
>>>> Reader not with the restriction but with the particular instance of
>>>> (Un)boundedSource (after being split). That could be done in the processing
>>>> DoFn, if it contained a cache mapping instance of the source to the
>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign
>>>> (or create) the reader to the tracker, as the tracker is created for each
>>>> restriction.
>>>>
>>>> WDYT?
>>>>
>>> I was thinking about this but it seems like it is not applicable to the
>>> way how UnboundedSource and UnboundedReader work together.
>>> Please correct me if I'm wrong. The UnboundedReader is created from
>>> UnboundedSource per CheckpointMark[1], which means for certain sources, the
>>> CheckpointMark could affect some attributes like start position of the
>>> reader when resuming. So a single UnboundedSource could be mapped to
>>> multiple readers because of different instances of CheckpointMarl. That's
>>> also the reason why we use CheckpointMark as the restriction.
>>>
>>> Please let me know if I misunderstand your suggestion.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
>>>
>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com> wrote:
>>>
>>>> Hi Boyuan,
>>>>
>>>> Sorry for my late reply. I was off for a few days.
>>>>
>>>> I didn't use DirectRunner. I am using FlinkRunner.
>>>>
>>>> We measured the number of Kafka messages that we can processed per
>>>> second.
>>>> With Beam v2.26 with --experiments=use_deprecated_read and
>>>> --fasterCopy=true,
>>>> we are able to consume 13K messages per second, but with Beam v2.26
>>>> without the use_deprecated_read option, we are only able to process 10K
>>>> messages
>>>> per second for the same pipeline.
>>>>
>>>> Thanks and regards,
>>>>
>>>> Antonio.
>>>>
>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote:
>>>> > Hi Antonio,
>>>> >
>>>> > Thanks for the details! Which version of Beam SDK are you using? And
>>>> are
>>>> > you using --experiments=beam_fn_api with DirectRunner to launch your
>>>> > pipeline?
>>>> >
>>>> > For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>>>> > topic+partition as input element and a KafkaConsumer will be assigned
>>>> to
>>>> > this topic+partition then poll records continuously. The Kafka
>>>> consumer
>>>> > will resume reading and return from the process fn when
>>>> >
>>>> >    - There are no available records currently(this is a feature of SDF
>>>> >    which calls SDF self-initiated checkpoint)
>>>> >    - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>>>> >    checkpoint request to ReadFromKafkaDoFn for getting partial
>>>> results. The
>>>> >    checkpoint frequency for DirectRunner is every 100 output records
>>>> or every
>>>> >    1 seconds.
>>>> >
>>>> > It seems like either the self-initiated checkpoint or DirectRunner
>>>> issued
>>>> > checkpoint gives you the performance regression since there is
>>>> overhead
>>>> > when rescheduling residuals. In your case, it's more like that the
>>>> > checkpoint behavior of
>>>> OutputAndTimeBoundedSplittableProcessElementInvoker
>>>> > gives you 200 elements a batch. I want to understand what kind of
>>>> > performance regression you are noticing? Is it slower to output the
>>>> same
>>>> > amount of records?
>>>> >
>>>> > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com>
>>>> wrote:
>>>> >
>>>> > > Hi Boyuan,
>>>> > >
>>>> > > This is Antonio. I reported the KafkaIO.read() performance issue on
>>>> the
>>>> > > slack channel a few days ago.
>>>> > >
>>>> > > I am not sure if this is helpful, but I have been doing some
>>>> debugging on
>>>> > > the SDK KafkaIO performance issue for our pipeline and I would like
>>>> to
>>>> > > provide some observations.
>>>> > >
>>>> > > It looks like in my case the ReadFromKafkaDoFn.processElement()  was
>>>> > > invoked within the same thread and every time kafaconsumer.poll() is
>>>> > > called, it returns some records, from 1 up to 200 records. So, it
>>>> will
>>>> > > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes
>>>> about
>>>> > > 0.8ms. So, in this case, the polling and running of the pipeline are
>>>> > > executed sequentially within a single thread. So, after processing
>>>> a batch
>>>> > > of records, it will need to wait for 0.8ms before it can process
>>>> the next
>>>> > > batch of records again.
>>>> > >
>>>> > > Any suggestions would be appreciated.
>>>> > >
>>>> > > Hope that helps.
>>>> > >
>>>> > > Thanks and regards,
>>>> > >
>>>> > > Antonio.
>>>> > >
>>>> > > On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
>>>> > > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for
>>>> tracking.
>>>> > > >
>>>> > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com>
>>>> wrote:
>>>> > > >
>>>> > > > > Thanks for the pointer, Steve! I'll check it out. The execution
>>>> paths
>>>> > > for
>>>> > > > > UnboundedSource and SDF wrapper are different. It's highly
>>>> possible
>>>> > > that
>>>> > > > > the regression either comes from the invocation path for SDF
>>>> wrapper,
>>>> > > or
>>>> > > > > the implementation of SDF wrapper itself.
>>>> > > > >
>>>> > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>>>> sniem...@apache.org>
>>>> > > wrote:
>>>> > > > >
>>>> > > > >> Coincidentally, someone else in the ASF slack mentioned [1]
>>>> yesterday
>>>> > > > >> that they were seeing significantly reduced performance using
>>>> > > KafkaIO.Read
>>>> > > > >> w/ the SDF wrapper vs the unbounded source.  They mentioned
>>>> they were
>>>> > > using
>>>> > > > >> flink 1.9.
>>>> > > > >>
>>>> > > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>>> > > > >>
>>>> > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <
>>>> boyu...@google.com>
>>>> > > wrote:
>>>> > > > >>
>>>> > > > >>> Hi Steve,
>>>> > > > >>>
>>>> > > > >>> I think the major performance regression comes from
>>>> > > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which
>>>> will
>>>> > > > >>> checkpoint the DoFn based on time/output limit and use
>>>> timers/state
>>>> > > to
>>>> > > > >>> reschedule works.
>>>> > > > >>>
>>>> > > > >>> [1]
>>>> > > > >>>
>>>> > >
>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>> > > > >>>
>>>> > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>>>> sniem...@apache.org>
>>>> > > > >>> wrote:
>>>> > > > >>>
>>>> > > > >>>> I have a pipeline that reads from pubsub, does some
>>>> aggregation, and
>>>> > > > >>>> writes to various places.  Previously, in older versions of
>>>> beam,
>>>> > > when
>>>> > > > >>>> running this in the DirectRunner, messages would go through
>>>> the
>>>> > > pipeline
>>>> > > > >>>> almost instantly, making it very easy to debug locally, etc.
>>>> > > > >>>>
>>>> > > > >>>> However, after upgrading to beam 2.25, I noticed that it
>>>> could take
>>>> > > on
>>>> > > > >>>> the order of 5-10 minutes for messages to get from the
>>>> pubsub read
>>>> > > step to
>>>> > > > >>>> the next step in the pipeline (deserializing them, etc).  The
>>>> > > subscription
>>>> > > > >>>> being read from has on the order of 100,000 elements/sec
>>>> arriving
>>>> > > in it.
>>>> > > > >>>>
>>>> > > > >>>> Setting --experiments=use_deprecated_read fixes it, and
>>>> makes the
>>>> > > > >>>> pipeline behave as it did before.
>>>> > > > >>>>
>>>> > > > >>>> It seems like the SDF implementation in the DirectRunner
>>>> here is
>>>> > > > >>>> causing some kind of issue, either buffering a very large
>>>> amount of
>>>> > > data
>>>> > > > >>>> before emitting it in a bundle, or something else.  Has
>>>> anyone else
>>>> > > run
>>>> > > > >>>> into this?
>>>> > > > >>>>
>>>> > > > >>>
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>

Reply via email to