Sorry for the typo in your name. :-)
On 1/6/21 10:11 AM, Jan Lukavský wrote:
Hi Antonie,
yes, for instance. I'd just like to rule out possibility that a single
DoFn processing multiple partitions (restrictions) brings some
overhead in your case.
Jan
On 12/31/20 10:36 PM, Antonio Si wrote:
Hi Jan,
Sorry for the late reply. My topic has 180 partitions. Do you mean
run with a
parallelism set to 900?
Thanks.
Antonio.
On 2020/12/23 20:30:34, Jan Lukavský <[email protected]> wrote:
OK,
could you make an experiment and increase the parallelism to something
significantly higher than the total number of partitions? Say 5 times
higher? Would that have impact on throughput in your case?
Jan
On 12/23/20 7:03 PM, Antonio Si wrote:
Hi Jan,
The performance data that I reported was run with parallelism = 8.
We also ran with parallelism = 15 and we observed similar behaviors
although I don't have the exact numbers. I can get you the numbers
if needed.
Regarding number of partitions, since we have multiple topics, the
number of partitions varies from 180 to 12. The highest TPS topic
has 180 partitions, while the lowest TPS topic has 12 partitions.
Thanks.
Antonio.
On 2020/12/23 12:28:42, Jan Lukavský <[email protected]> wrote:
Hi Antonio,
can you please clarify a few things:
a) what parallelism you use for your sources
b) how many partitions there is in your topic(s)
Thanks,
Jan
On 12/22/20 10:07 PM, Antonio Si wrote:
Hi Boyuan,
Let me clarify, I have tried with and without using
--experiments=beam_fn_api,use_sdf_kafka_read option:
- with --experiments=use_deprecated_read --fasterrCopy=true, I
am able to achieve 13K TPS
- with --experiments="beam_fn_api,use_sdf_kafka_read"
--fasterCopy=true, I am able to achieve 10K
- with --fasterCopy=true alone, I am only able to achieve 5K TPS
In our testcase, we have multiple topics, checkpoint intervals is
60s. Some topics have a lot higher traffics than others. We look
at the case with --experiments="beam_fn_api,use_sdf_kafka_read"
--fasterCopy=true options a little. Based on our observation,
each consumer poll() in ReadFromKafkaDoFn.processElement() takes
about 0.8ms. So for topic with high traffics, it will continue in
the loop because every poll() will return some records. Every
poll returns about 200 records. So, it takes about 0.8ms for
every 200 records. I am not sure if that is part of the reason
for the performance.
Thanks.
Antonio.
On 2020/12/21 19:03:19, Boyuan Zhang <[email protected]> wrote:
Hi Antonio,
Thanks for the data point. That's very valuable information!
I didn't use DirectRunner. I am using FlinkRunner.
We measured the number of Kafka messages that we can processed
per second.
With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam
v2.26
without the use_deprecated_read option, we are only able to
process 10K
messages
per second for the same pipeline.
We do have SDF implementation of Kafka Read instead of using the
wrapper.
Would you like to have a try to see whether it helps you improve
your
situation? You can use
--experiments=beam_fn_api,use_sdf_kafka_read to
switch to the Kafka SDF Read.
On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang
<[email protected]> wrote:
Hi Jan,
it seems that what we would want is to couple the lifecycle of
the Reader
not with the restriction but with the particular instance of
(Un)boundedSource (after being split). That could be done in
the processing
DoFn, if it contained a cache mapping instance of the source
to the
(possibly null - i.e. not yet open) reader. In @NewTracker we
could assign
(or create) the reader to the tracker, as the tracker is
created for each
restriction.
WDYT?
I was thinking about this but it seems like it is not
applicable to the
way how UnboundedSource and UnboundedReader work together.
Please correct me if I'm wrong. The UnboundedReader is created
from
UnboundedSource per CheckpointMark[1], which means for certain
sources, the
CheckpointMark could affect some attributes like start position
of the
reader when resuming. So a single UnboundedSource could be
mapped to
multiple readers because of different instances of
CheckpointMarl. That's
also the reason why we use CheckpointMark as the restriction.
Please let me know if I misunderstand your suggestion.
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
On Mon, Dec 21, 2020 at 9:18 AM Antonio Si
<[email protected]> wrote:
Hi Boyuan,
Sorry for my late reply. I was off for a few days.
I didn't use DirectRunner. I am using FlinkRunner.
We measured the number of Kafka messages that we can processed
per second.
With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam
v2.26
without the use_deprecated_read option, we are only able to
process 10K
messages
per second for the same pipeline.
Thanks and regards,
Antonio.
On 2020/12/11 22:19:40, Boyuan Zhang <[email protected]> wrote:
Hi Antonio,
Thanks for the details! Which version of Beam SDK are you
using? And are
you using --experiments=beam_fn_api with DirectRunner to
launch your
pipeline?
For ReadFromKafkaDoFn.processElement(), it will take a Kafka
topic+partition as input element and a KafkaConsumer will be
assigned to
this topic+partition then poll records continuously. The
Kafka consumer
will resume reading and return from the process fn when
- There are no available records currently(this is a
feature of SDF
which calls SDF self-initiated checkpoint)
- The
OutputAndTimeBoundedSplittableProcessElementInvoker issues
checkpoint request to ReadFromKafkaDoFn for getting
partial results.
The
checkpoint frequency for DirectRunner is every 100
output records or
every
1 seconds.
It seems like either the self-initiated checkpoint or
DirectRunner
issued
checkpoint gives you the performance regression since there
is overhead
when rescheduling residuals. In your case, it's more like
that the
checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to understand what
kind of
performance regression you are noticing? Is it slower to
output the same
amount of records?
On Fri, Dec 11, 2020 at 1:31 PM Antonio Si
<[email protected]>
wrote:
Hi Boyuan,
This is Antonio. I reported the KafkaIO.read() performance
issue on
the
slack channel a few days ago.
I am not sure if this is helpful, but I have been doing some
debugging on
the SDK KafkaIO performance issue for our pipeline and I
would like to
provide some observations.
It looks like in my case the
ReadFromKafkaDoFn.processElement() was
invoked within the same thread and every time
kafaconsumer.poll() is
called, it returns some records, from 1 up to 200 records.
So, it will
proceed to run the pipeline steps. Each kafkaconsumer.poll()
takes
about
0.8ms. So, in this case, the polling and running of the
pipeline are
executed sequentially within a single thread. So, after
processing a
batch
of records, it will need to wait for 0.8ms before it can
process the
next
batch of records again.
Any suggestions would be appreciated.
Hope that helps.
Thanks and regards,
Antonio.
On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]>
wrote:
Opened https://issues.apache.org/jira/browse/BEAM-11403 for
tracking.
On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
<[email protected]>
wrote:
Thanks for the pointer, Steve! I'll check it out. The
execution
paths
for
UnboundedSource and SDF wrapper are different. It's highly
possible
that
the regression either comes from the invocation path for SDF
wrapper,
or
the implementation of SDF wrapper itself.
On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
<[email protected]
wrote:
Coincidentally, someone else in the ASF slack mentioned [1]
yesterday
that they were seeing significantly reduced performance
using
KafkaIO.Read
w/ the SDF wrapper vs the unbounded source. They
mentioned they
were
using
flink 1.9.
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
<[email protected]>
wrote:
Hi Steve,
I think the major performance regression comes from
OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which
will
checkpoint the DoFn based on time/output limit and use
timers/state
to
reschedule works.
[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <
[email protected]>
wrote:
I have a pipeline that reads from pubsub, does some
aggregation, and
writes to various places. Previously, in older
versions of
beam,
when
running this in the DirectRunner, messages would go
through the
pipeline
almost instantly, making it very easy to debug locally,
etc.
However, after upgrading to beam 2.25, I noticed that
it could
take
on
the order of 5-10 minutes for messages to get from the
pubsub
read
step to
the next step in the pipeline (deserializing them,
etc). The
subscription
being read from has on the order of 100,000 elements/sec
arriving
in it.
Setting --experiments=use_deprecated_read fixes it, and
makes
the
pipeline behave as it did before.
It seems like the SDF implementation in the
DirectRunner here
is
causing some kind of issue, either buffering a very large
amount of
data
before emitting it in a bundle, or something else. Has
anyone
else
run
into this?