Thanks for your explanation, Jan. Now I can see what you mean here. I can
try to have a PR to do such optimization. Would you like to share your
github ID with me to review the PR later?

On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw <[email protected]>
wrote:

> If readers are expensive to create, this seems like an important (and not
> too difficult) optimization.
>
> On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi Boyuan,
>>
>> I think your analysis is correct - with one exception. It should  be
>> possible to reuse the reader if and only if the last taken CheckpointMark
>> equals to the new CheckpointMark the reader would be created from. But -
>> this equality is on the happy path and should be satisfied for vast
>> majority of invocations, so it will spare many call to createReader.
>> Actually, it should be non-equal only after recovery from checkpoint, but
>> then there should be no reader. So to be technically correct, we should
>> keep the last CheckpointMark along with the open reader, but that might
>> turn out to be non-necessary (I'm not sure about that and I would
>> definitely keep the last CheckpointMark, because it is better safe than
>> sorry :))
>>
>> Jan
>> On 12/21/20 7:54 PM, Boyuan Zhang wrote:
>>
>> Hi Jan,
>>>
>>> it seems that what we would want is to couple the lifecycle of the
>>> Reader not with the restriction but with the particular instance of
>>> (Un)boundedSource (after being split). That could be done in the processing
>>> DoFn, if it contained a cache mapping instance of the source to the
>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign
>>> (or create) the reader to the tracker, as the tracker is created for each
>>> restriction.
>>>
>>> WDYT?
>>>
>> I was thinking about this but it seems like it is not applicable to the
>> way how UnboundedSource and UnboundedReader work together.
>> Please correct me if I'm wrong. The UnboundedReader is created from
>> UnboundedSource per CheckpointMark[1], which means for certain sources, the
>> CheckpointMark could affect some attributes like start position of the
>> reader when resuming. So a single UnboundedSource could be mapped to
>> multiple readers because of different instances of CheckpointMarl. That's
>> also the reason why we use CheckpointMark as the restriction.
>>
>> Please let me know if I misunderstand your suggestion.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
>>
>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <[email protected]> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Sorry for my late reply. I was off for a few days.
>>>
>>> I didn't use DirectRunner. I am using FlinkRunner.
>>>
>>> We measured the number of Kafka messages that we can processed per
>>> second.
>>> With Beam v2.26 with --experiments=use_deprecated_read and
>>> --fasterCopy=true,
>>> we are able to consume 13K messages per second, but with Beam v2.26
>>> without the use_deprecated_read option, we are only able to process 10K
>>> messages
>>> per second for the same pipeline.
>>>
>>> Thanks and regards,
>>>
>>> Antonio.
>>>
>>> On 2020/12/11 22:19:40, Boyuan Zhang <[email protected]> wrote:
>>> > Hi Antonio,
>>> >
>>> > Thanks for the details! Which version of Beam SDK are you using? And
>>> are
>>> > you using --experiments=beam_fn_api with DirectRunner to launch your
>>> > pipeline?
>>> >
>>> > For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>>> > topic+partition as input element and a KafkaConsumer will be assigned
>>> to
>>> > this topic+partition then poll records continuously. The Kafka consumer
>>> > will resume reading and return from the process fn when
>>> >
>>> >    - There are no available records currently(this is a feature of SDF
>>> >    which calls SDF self-initiated checkpoint)
>>> >    - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>>> >    checkpoint request to ReadFromKafkaDoFn for getting partial
>>> results. The
>>> >    checkpoint frequency for DirectRunner is every 100 output records
>>> or every
>>> >    1 seconds.
>>> >
>>> > It seems like either the self-initiated checkpoint or DirectRunner
>>> issued
>>> > checkpoint gives you the performance regression since there is overhead
>>> > when rescheduling residuals. In your case, it's more like that the
>>> > checkpoint behavior of
>>> OutputAndTimeBoundedSplittableProcessElementInvoker
>>> > gives you 200 elements a batch. I want to understand what kind of
>>> > performance regression you are noticing? Is it slower to output the
>>> same
>>> > amount of records?
>>> >
>>> > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <[email protected]>
>>> wrote:
>>> >
>>> > > Hi Boyuan,
>>> > >
>>> > > This is Antonio. I reported the KafkaIO.read() performance issue on
>>> the
>>> > > slack channel a few days ago.
>>> > >
>>> > > I am not sure if this is helpful, but I have been doing some
>>> debugging on
>>> > > the SDK KafkaIO performance issue for our pipeline and I would like
>>> to
>>> > > provide some observations.
>>> > >
>>> > > It looks like in my case the ReadFromKafkaDoFn.processElement()  was
>>> > > invoked within the same thread and every time kafaconsumer.poll() is
>>> > > called, it returns some records, from 1 up to 200 records. So, it
>>> will
>>> > > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes
>>> about
>>> > > 0.8ms. So, in this case, the polling and running of the pipeline are
>>> > > executed sequentially within a single thread. So, after processing a
>>> batch
>>> > > of records, it will need to wait for 0.8ms before it can process the
>>> next
>>> > > batch of records again.
>>> > >
>>> > > Any suggestions would be appreciated.
>>> > >
>>> > > Hope that helps.
>>> > >
>>> > > Thanks and regards,
>>> > >
>>> > > Antonio.
>>> > >
>>> > > On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]> wrote:
>>> > > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for
>>> tracking.
>>> > > >
>>> > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <[email protected]>
>>> wrote:
>>> > > >
>>> > > > > Thanks for the pointer, Steve! I'll check it out. The execution
>>> paths
>>> > > for
>>> > > > > UnboundedSource and SDF wrapper are different. It's highly
>>> possible
>>> > > that
>>> > > > > the regression either comes from the invocation path for SDF
>>> wrapper,
>>> > > or
>>> > > > > the implementation of SDF wrapper itself.
>>> > > > >
>>> > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <
>>> [email protected]>
>>> > > wrote:
>>> > > > >
>>> > > > >> Coincidentally, someone else in the ASF slack mentioned [1]
>>> yesterday
>>> > > > >> that they were seeing significantly reduced performance using
>>> > > KafkaIO.Read
>>> > > > >> w/ the SDF wrapper vs the unbounded source.  They mentioned
>>> they were
>>> > > using
>>> > > > >> flink 1.9.
>>> > > > >>
>>> > > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>> > > > >>
>>> > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <[email protected]
>>> >
>>> > > wrote:
>>> > > > >>
>>> > > > >>> Hi Steve,
>>> > > > >>>
>>> > > > >>> I think the major performance regression comes from
>>> > > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which
>>> will
>>> > > > >>> checkpoint the DoFn based on time/output limit and use
>>> timers/state
>>> > > to
>>> > > > >>> reschedule works.
>>> > > > >>>
>>> > > > >>> [1]
>>> > > > >>>
>>> > >
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>> > > > >>>
>>> > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
>>> [email protected]>
>>> > > > >>> wrote:
>>> > > > >>>
>>> > > > >>>> I have a pipeline that reads from pubsub, does some
>>> aggregation, and
>>> > > > >>>> writes to various places.  Previously, in older versions of
>>> beam,
>>> > > when
>>> > > > >>>> running this in the DirectRunner, messages would go through
>>> the
>>> > > pipeline
>>> > > > >>>> almost instantly, making it very easy to debug locally, etc.
>>> > > > >>>>
>>> > > > >>>> However, after upgrading to beam 2.25, I noticed that it
>>> could take
>>> > > on
>>> > > > >>>> the order of 5-10 minutes for messages to get from the pubsub
>>> read
>>> > > step to
>>> > > > >>>> the next step in the pipeline (deserializing them, etc).  The
>>> > > subscription
>>> > > > >>>> being read from has on the order of 100,000 elements/sec
>>> arriving
>>> > > in it.
>>> > > > >>>>
>>> > > > >>>> Setting --experiments=use_deprecated_read fixes it, and makes
>>> the
>>> > > > >>>> pipeline behave as it did before.
>>> > > > >>>>
>>> > > > >>>> It seems like the SDF implementation in the DirectRunner here
>>> is
>>> > > > >>>> causing some kind of issue, either buffering a very large
>>> amount of
>>> > > data
>>> > > > >>>> before emitting it in a bundle, or something else.  Has
>>> anyone else
>>> > > run
>>> > > > >>>> into this?
>>> > > > >>>>
>>> > > > >>>
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to