About "first layer" right?
Then it's better to make sure that not get() the result of Producer#send()
for each message, because in that way, it spoils the ability of
producer-batching.
Kafka producer batches messages by default and it's very efficient, so if
you produce in async way, it rarely becomes a bottleneck in general.
> Also are there any producer optimizations

By the way, if "first layer" just filters then produces messages without
interacting with any other external DB, using KafkaStreams should be much
easier.

2020年12月22日(火) 3:27 Yana K <yanak1...@gmail.com>:

> Thanks!
>
> Also are there any producer optimizations anyone can think of in this
> scenario?
>
>
>
> On Mon, Dec 21, 2020 at 8:58 AM Joris Peeters <joris.mg.peet...@gmail.com>
> wrote:
>
> > I'd probably just do it by experiment for your concrete data.
> >
> > Maybe generate a few million synthetic data rows, and for-each-batch
> insert
> > them into a dev DB, with an outer grid search over various candidate
> batch
> > sizes. You're looking to optimise for flat-out rows/s, so whichever batch
> > size wins (given a fixed number of total rows) is near-optimal.
> > You can repeat the exercise with N simultaneous threads to inspect how
> > batch sizes and multiple partitions P would interact (which might well be
> > sublinear in P in case of e.g. transactions etc).
> >
> > On Mon, Dec 21, 2020 at 4:48 PM Yana K <yanak1...@gmail.com> wrote:
> >
> > > Thanks Haruki and Joris.
> > >
> > > Haruki:
> > > Thanks for the detailed calculations. Really appreciate it. What
> tool/lib
> > > is used to load test kafka?
> > > So we've one consumer group and running 7 instances of the application
> -
> > > that should be good enough - correct?
> > >
> > > Joris:
> > > Great point.
> > > DB insert is a bottleneck (and hence moved it to its own layer) - and
> we
> > > are batching but wondering what is the best way to calculate the batch
> > > size.
> > >
> > > Thanks,
> > > Yana
> > >
> > > On Mon, Dec 21, 2020 at 1:39 AM Joris Peeters <
> > joris.mg.peet...@gmail.com>
> > > wrote:
> > >
> > > > Do you know why your consumers are so slow? 12E6msg/hour is
> 3333msg/s,
> > > > which is not very high from a Kafka point-of-view. As you're doing
> > > database
> > > > inserts, I suspect that is where the bottleneck lies.
> > > >
> > > > If, for example, you're doing a single-row insert in a SQL DB for
> every
> > > > message then this would incur a lot of overhead. Yes, you can
> somewhat
> > > > alleviate that by parallellising - i.e. increasing the partition
> count
> > -
> > > > but it is also worth looking at batch inserts, if you aren't yet.
> Say,
> > > each
> > > > consumer waits for 1000 messages or 5 seconds to have passed
> (whichever
> > > > comes first) and then does a single bulk insert of the msgs it has
> > > > received, followed by a manual commit.
> > > >
> > > > [A] you might already be doing this and [B] your DB of choice might
> not
> > > > support bulk inserts (although most do), but otherwise I'd expect
> this
> > to
> > > > work a lot better than increasing the partition count.
> > > >
> > > > On Mon, Dec 21, 2020 at 8:10 AM Haruki Okada <ocadar...@gmail.com>
> > > wrote:
> > > >
> > > > > About load test:
> > > > > I think it'd be better to monitor per-message process latency and
> > > > estimate
> > > > > required partition count based on it because it determines the max
> > > > > throughput per single partition.
> > > > > - Say you have to process 12 million messages/hour = 3333
> > messages/sec
> > > .
> > > > > - If you have 7 partitions (thus 7 parallel consumers at maximum),
> > > single
> > > > > consumer should process 3333 / 7 = 476 messages/sec
> > > > > - It means, process latency per single message should be lower than
> > 2.1
> > > > > milliseconds (1000 / 476)
> > > > >   => If you have 14 partitions, it becomes 4.2 milliseconds
> > > > >
> > > > > So required partition count can be calculated by per-message
> process
> > > > > latency. (I think Spring-Kafka can be easily integrated with
> > prometheus
> > > > so
> > > > > you can use it to measure that)
> > > > >
> > > > > About increasing instance count:
> > > > > - It depends on current system resource usage.
> > > > >   * If the system resource is not so busy (likely because the
> > consumer
> > > > just
> > > > > almost waits DB-write to return), you don't need to increase
> consumer
> > > > > instances
> > > > >   * But I think you should make sure that single consumer instance
> > > isn't
> > > > > assigned multiple partitions to fully parallelize consumption
> across
> > > > > partitions. (If I remember correctly,
> > > ConcurrentMessageListenerContainer
> > > > > has a property to configure the concurrency)
> > > > >
> > > > > 2020年12月21日(月) 15:51 Yana K <yanak1...@gmail.com>:
> > > > >
> > > > > > So as the next step I see to increase the partition of the 2nd
> > topic
> > > -
> > > > > do I
> > > > > > increase the instances of the consumer from that or keep it at 7?
> > > > > > Anything else (besides researching those libs)?
> > > > > >
> > > > > > Are there any good tools for load testing kafka?
> > > > > >
> > > > > > On Sun, Dec 20, 2020 at 7:23 PM Haruki Okada <
> ocadar...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > It depends on how you manually commit offsets.
> > > > > > > Auto-commit does commits offsets in async manner basically, so
> as
> > > > long
> > > > > as
> > > > > > > you do manual-commit in the same way,  there should be no much
> > > > > > difference.
> > > > > > >
> > > > > > > And, generally offset-commit mode doesn't make much difference
> in
> > > > > > > performance regardless manual/auto or async/sync unless
> > > offset-commit
> > > > > > > latency takes significant amount in processing time (e.g. you
> > > commit
> > > > > > > offsets synchronously in every poll() loop).
> > > > > > >
> > > > > > > 2020年12月21日(月) 11:08 Yana K <yanak1...@gmail.com>:
> > > > > > >
> > > > > > > > Thank you so much Marina and Haruka.
> > > > > > > >
> > > > > > > > Marina's response:
> > > > > > > > - When you say " if you are sure there is no room for perf
> > > > > optimization
> > > > > > > of
> > > > > > > > the processing itself :" - do you mean code level
> > optimizations?
> > > > Can
> > > > > > you
> > > > > > > > please explain?
> > > > > > > > - On the second topic you say " I'd say at least 40" - is
> this
> > > > based
> > > > > on
> > > > > > > 12
> > > > > > > > million records / hour?
> > > > > > > > -  "if you can change the incoming topic" - I don't think it
> is
> > > > > > possible
> > > > > > > :(
> > > > > > > > -  "you could artificially achieve the same by adding one
> more
> > > step
> > > > > > > > (service) in your pipeline" - this is the next thing - but I
> > want
> > > > to
> > > > > be
> > > > > > > > sure this will help, given we've to maintain one more layer
> > > > > > > >
> > > > > > > > Haruka's response:
> > > > > > > > - "One possible solution is creating an intermediate topic"
> - I
> > > > > already
> > > > > > > did
> > > > > > > > it
> > > > > > > > - I'll look at Decaton - thx
> > > > > > > >
> > > > > > > > Is there any thoughts on the auto commit vs manual commit -
> if
> > it
> > > > can
> > > > > > > > better the performance while consuming?
> > > > > > > >
> > > > > > > > Yana
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada <
> > > ocadar...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi.
> > > > > > > > >
> > > > > > > > > Yeah, Spring-Kafka does processing messages sequentially,
> so
> > > the
> > > > > > > consumer
> > > > > > > > > throughput would be capped by database latency per single
> > > > process.
> > > > > > > > > One possible solution is creating an intermediate topic (or
> > > > > altering
> > > > > > > > source
> > > > > > > > > topic) with much more partitions as Marina suggested.
> > > > > > > > >
> > > > > > > > > I'd like to suggest another solution, that is
> multi-threaded
> > > > > > processing
> > > > > > > > per
> > > > > > > > > single partition.
> > > > > > > > > Decaton (https://github.com/line/decaton) is a library to
> > > > achieve
> > > > > > it.
> > > > > > > > >
> > > > > > > > > Also confluent has published a blog post about
> > > parallel-consumer
> > > > (
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > > > > > > > > )
> > > > > > > > > for that purpose, but it seems it's still in the BETA
> stage.
> > > > > > > > >
> > > > > > > > > 2020年12月20日(日) 11:41 Marina Popova <
> ppine7...@protonmail.com
> > > > > > .invalid>:
> > > > > > > > >
> > > > > > > > > > The way I see it - you can only do a few things - if you
> > are
> > > > sure
> > > > > > > there
> > > > > > > > > is
> > > > > > > > > > no room for perf optimization of the processing itself :
> > > > > > > > > > 1. speed up your processing per consumer thread: which
> you
> > > > > already
> > > > > > > > tried
> > > > > > > > > > by splitting your logic into a 2-step pipeline instead of
> > > > 1-step,
> > > > > > and
> > > > > > > > > > delegating the work of writing to a DB to the second
> step (
> > > > make
> > > > > > sure
> > > > > > > > > your
> > > > > > > > > > second intermediate Kafka topic is created with much more
> > > > > > partitions
> > > > > > > to
> > > > > > > > > be
> > > > > > > > > > able to parallelize your work much higher - I'd say at
> > least
> > > > 40)
> > > > > > > > > > 2. if you can change the incoming topic - I would create
> it
> > > > with
> > > > > > many
> > > > > > > > > more
> > > > > > > > > > partitions as well - say at least 40 or so - to
> parallelize
> > > > your
> > > > > > > first
> > > > > > > > > step
> > > > > > > > > > service processing more
> > > > > > > > > > 3. and if you can't increase partitions for the original
> > > topic
> > > > )
> > > > > -
> > > > > > > you
> > > > > > > > > > could artificially achieve the same by adding one more
> step
> > > > > > (service)
> > > > > > > > in
> > > > > > > > > > your pipeline that would just read data from the original
> > > > > > 7-partition
> > > > > > > > > > topic1 and just push it unchanged into a new topic2 with
> ,
> > > say
> > > > 40
> > > > > > > > > > partitions - and then have your other services pick up
> from
> > > > this
> > > > > > > topic2
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > good luck,
> > > > > > > > > > Marina
> > > > > > > > > >
> > > > > > > > > > Sent with ProtonMail Secure Email.
> > > > > > > > > >
> > > > > > > > > > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> > > > > > > > > > On Saturday, December 19, 2020 6:46 PM, Yana K <
> > > > > > yanak1...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi
> > > > > > > > > > >
> > > > > > > > > > > I am new to the Kafka world and running into this scale
> > > > > problem.
> > > > > > I
> > > > > > > > > > thought
> > > > > > > > > > > of reaching out to the community if someone can help.
> > > > > > > > > > > So the problem is I am trying to consume from a Kafka
> > topic
> > > > > that
> > > > > > > can
> > > > > > > > > > have a
> > > > > > > > > > > peak of 12 million messages/hour. That topic is not
> under
> > > my
> > > > > > > control
> > > > > > > > -
> > > > > > > > > it
> > > > > > > > > > > has 7 partitions and sending json payload.
> > > > > > > > > > > I have written a consumer (I've used Java and
> > Spring-Kafka
> > > > lib)
> > > > > > > that
> > > > > > > > > will
> > > > > > > > > > > read that data, filter it and then load it into a
> > > database. I
> > > > > ran
> > > > > > > > into
> > > > > > > > > a
> > > > > > > > > > > huge consumer lag that would take 10-12hours to catch
> > up. I
> > > > > have
> > > > > > 7
> > > > > > > > > > > instances of my application running to match the 7
> > > partitions
> > > > > > and I
> > > > > > > > am
> > > > > > > > > > > using auto commit. Then I thought of splitting the
> write
> > > > logic
> > > > > > to a
> > > > > > > > > > > separate layer. So now my architecture has a component
> > that
> > > > > reads
> > > > > > > and
> > > > > > > > > > > filters and produces the data to an internal topic
> (I've
> > > > done 7
> > > > > > > > > > partitions
> > > > > > > > > > > but as you see it's under my control). Then a consumer
> > > picks
> > > > up
> > > > > > > data
> > > > > > > > > from
> > > > > > > > > > > that topic and writes it to the database. It's better
> but
> > > > still
> > > > > > it
> > > > > > > > > takes
> > > > > > > > > > > 3-5hours for the consumer lag to catch up.
> > > > > > > > > > > Am I missing something fundamentally? Are there any
> other
> > > > ideas
> > > > > > for
> > > > > > > > > > > optimization that can help overcome this scale
> challenge.
> > > Any
> > > > > > > pointer
> > > > > > > > > and
> > > > > > > > > > > article will help too.
> > > > > > > > > > >
> > > > > > > > > > > Appreciate your help with this.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > > Yana
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > ========================
> > > > > > > > > Okada Haruki
> > > > > > > > > ocadar...@gmail.com
> > > > > > > > > ========================
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > ========================
> > > > > > > Okada Haruki
> > > > > > > ocadar...@gmail.com
> > > > > > > ========================
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > ========================
> > > > > Okada Haruki
> > > > > ocadar...@gmail.com
> > > > > ========================
> > > > >
> > > >
> > >
> >
>


-- 
========================
Okada Haruki
ocadar...@gmail.com
========================

Reply via email to