@Ryanne:
In my mind's eye I slightly prefer the throwing the "cannot enqueue"
exception to satisfying the future immediately with the "cannot enqueue"
exception?  But I agree, it would be worth doing more research.

@Matthew:

> 3. Using multiple thread pools is definitely recommended for different
> types of tasks, for serialization which is CPU bound you definitely would
> want to use a bounded thread pool that is fixed by the number of CPU's (or
> something along those lines).
> https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c is a
> very good guide on this topic
I think this guide is good in general, but I would be hesitant to follow
its guidance re: offloading serialization without benchmarking it.  My
understanding is that context-switches have gotten much cheaper, and that
gains from cache locality are small, but they're not nothing.  Especially
if the workload has a very small serialization cost, I wouldn't be shocked
if it made it slower.  I feel pretty strongly that we should do more
research here before unconditionally encouraging serialization in a
threadpool.  If people think it's important to do it here (eg if we think
it would mean another big API change) then we should start thinking about
what benchmarking we can do to gain higher confidence in this kind of
change.  However, I don't think it would change semantics as substantially
as we're proposing here, so I would vote for pushing this to a subsequent
KIP.

> 4. Regarding providing the ability for users to supply their own custom
> ThreadPool this is more of an ergonomics question for the API. Especially
> when it gets to monitoring/tracing, giving the ability for users to
provide
> their own custom IO/CPU ThreadPools is ideal however as stated doing so
> means a lot of boilerplatery changes to the API. Typically speaking a lot
> of monitoring/tracing/diagnosing is done on ExecutionContext/ThreadPools
> (at least on a more rudimentary level) and hence allowing users to supply
a
> global singleton ThreadPool for IO tasks and another for CPU tasks makes
> their lives a lot easier. However due to the large amount of changes to
the
> API, it may be more appropriate to just use internal thread pools (for
now)
> since at least it's not any worse than what exists currently and this can
> be an improvement that is done later?
Is there an existing threadpool that you suggest we reuse?  Or are you
imagining that we make our own internal threadpool, and then maybe expose
configuration flags to manipulate it?  For what it's worth, I like having
an internal threadpool (perhaps just FJP.commonpool) and then providing an
alternative to pass your own threadpool.  That way people who want finer
control can get it, and everyone else can do OK with the default.


On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich
<matthew.dedetr...@aiven.io.invalid> wrote:

> Here are my two cents here (note that I am only seeing this on a surface
> level)
>
> 1. If we are going this road it makes sense to do this "properly" (i.e.
> using queues  as Ryan suggested). The reason I am saying this is that it
> seems that the original goal of the KIP is for it to be used in other
> asynchronous systems and from my personal experience, you really do need to
> make the implementation properly asynchronous otherwise it's really not
> that useful.
> 2. Due to the previous point and what was said by others, this is likely
> going to break some existing semantics (i.e. people are currently relying
> on blocking semantics) so adding another method's/interface plus
> deprecating the older one is more annoying but ideal.
> 3. Using multiple thread pools is definitely recommended for different
> types of tasks, for serialization which is CPU bound you definitely would
> want to use a bounded thread pool that is fixed by the number of CPU's (or
> something along those lines).
> https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c is a
> very good guide on this topic
> 4. Regarding providing the ability for users to supply their own custom
> ThreadPool this is more of an ergonomics question for the API. Especially
> when it gets to monitoring/tracing, giving the ability for users to provide
> their own custom IO/CPU ThreadPools is ideal however as stated doing so
> means a lot of boilerplatery changes to the API. Typically speaking a lot
> of monitoring/tracing/diagnosing is done on ExecutionContext/ThreadPools
> (at least on a more rudimentary level) and hence allowing users to supply a
> global singleton ThreadPool for IO tasks and another for CPU tasks makes
> their lives a lot easier. However due to the large amount of changes to the
> API, it may be more appropriate to just use internal thread pools (for now)
> since at least it's not any worse than what exists currently and this can
> be an improvement that is done later?
>
> On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
> > I was thinking the sender would typically wrap send() in a backoff/retry
> > loop, or else ignore any failures and drop sends on the floor
> > (fire-and-forget), and in both cases I think failing immediately is
> better
> > than blocking for a new spot in the queue or asynchronously failing
> > somehow.
> >
> > I think a failed future is adequate for the "explicit backpressure
> signal"
> > while avoiding any blocking anywhere. I think if we try to asynchronously
> > signal the caller of failure (either by asynchronously failing the future
> > or invoking a callback off-thread or something) we'd force the caller to
> > either block or poll waiting for that signal, which somewhat defeats the
> > purpose we're after. And of course blocking for a spot in the queue
> > definitely defeats the purpose (tho perhaps ameliorates the problem
> some).
> >
> > Throwing an exception to the caller directly (not via the future) is
> > another option with precedent in Kafka clients, tho it doesn't seem as
> > ergonomic to me.
> >
> > It would be interesting to analyze some existing usage and determine how
> > difficult it would be to convert it to the various proposed APIs.
> >
> > Ryanne
> >
> > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com> wrote:
> >
> > > Hi Ryanne,
> > >
> > > Hmm, that's an interesting idea.  Basically it would mean that after
> > > calling send, you would also have to check whether the returned future
> > had
> > > failed with a specific exception.  I would be open to it, although I
> > think
> > > it might be slightly more surprising, since right now the paradigm is
> > > "enqueue synchronously, the future represents whether we succeeded in
> > > sending or not" and the new one would be "enqueue synchronously, the
> > future
> > > either represents whether we succeeded in enqueueing or not (in which
> > case
> > > it will be failed immediately if it failed to enqueue) or whether we
> > > succeeded in sending or not".
> > >
> > > But you're right, it should be on the table, thank you for suggesting
> it!
> > >
> > > Best,
> > > Moses
> > >
> > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > >
> > > > Moses, in the case of a full queue, could we just return a failed
> > future
> > > > immediately?
> > > >
> > > > Ryanne
> > > >
> > > > On Tue, May 18, 2021, 10:39 AM Nakamura <nny...@gmail.com> wrote:
> > > >
> > > > > Hi Alexandre,
> > > > >
> > > > > Thanks for bringing this up, I think I could use some feedback in
> > this
> > > > > area.  There are two mechanisms here, one for slowing down when we
> > > don't
> > > > > have the relevant metadata, and the other for slowing down when a
> > queue
> > > > has
> > > > > filled up.  Although the first one applies backpressure somewhat
> > > > > inadvertently, we could still get in trouble if we're not providing
> > > > > information to the mechanism that monitors whether we're queueing
> too
> > > > > much.  As for the second one, that is a classic backpressure use
> > case,
> > > so
> > > > > it's definitely important that we don't drop that ability.
> > > > >
> > > > > Right now backpressure is applied by blocking, which is a natural
> way
> > > to
> > > > > apply backpressure in synchronous systems, but can lead to
> > unnecessary
> > > > > slowdowns in asynchronous systems.  In my opinion, the safest way
> to
> > > > apply
> > > > > backpressure in an asynchronous model is to have an explicit
> > > backpressure
> > > > > signal.  A good example would be returning an exception, and
> > providing
> > > an
> > > > > optional hook to add a callback onto so that you can be notified
> when
> > > > it's
> > > > > ready to accept more messages.
> > > > >
> > > > > However, this would be a really big change to how users use
> > > > > KafkaProducer#send, so I don't know how much appetite we have for
> > > making
> > > > > that kind of change.  Maybe it would be simpler to remove the
> "don't
> > > > block
> > > > > when the per-topic queue is full" from the scope of this KIP, and
> > only
> > > > > focus on when metadata is available?  The downside is that we will
> > > > probably
> > > > > want to change the API again later to fix this, so it might be
> better
> > > to
> > > > > just rip the bandaid off now.
> > > > >
> > > > > One slightly nasty thing here is that because queueing order is
> > > > important,
> > > > > if we want to use exceptions, we will want to be able to signal the
> > > > failure
> > > > > to enqueue to the caller in such a way that they can still enforce
> > > > message
> > > > > order if they want.  So we can't embed the failure directly in the
> > > > returned
> > > > > future, we should either return two futures (nested, or as a tuple)
> > or
> > > > else
> > > > > throw an exception to explain a backpressure.
> > > > >
> > > > > So there are a few things we should work out here:
> > > > >
> > > > > 1. Should we keep the "too many bytes enqueued" part of this in
> > scope?
> > > > (I
> > > > > would say yes, so that we can minimize churn in this API)
> > > > > 2. How should we signal backpressure so that it's appropriate for
> > > > > asynchronous systems?  (I would say that we should throw an
> > exception.
> > > > If
> > > > > we choose this and we want to pursue the queueing path, we would
> > *not*
> > > > want
> > > > > to enqueue messages that would push us over the limit, and would
> only
> > > > want
> > > > > to enqueue messages when we're waiting for metadata, and we would
> > want
> > > to
> > > > > keep track of the total number of bytes for those messages).
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Best,
> > > > > Moses
> > > > >
> > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
> > > > > alexandre.dupr...@gmail.com> wrote:
> > > > >
> > > > > > Hello Nakamura,
> > > > > >
> > > > > > Thanks for proposing this change. I can see how the blocking
> > > behaviour
> > > > > > can be a problem when integrating with reactive frameworks such
> as
> > > > > > Akka. One of the questions I would have is how you would handle
> > back
> > > > > > pressure and avoid memory exhaustion when the producer's buffer
> is
> > > > > > full and tasks would start to accumulate in the out-of-band queue
> > or
> > > > > > thread pool introduced with this KIP.
> > > > > >
> > > > > > Thanks,
> > > > > > Alexandre
> > > > > >
> > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan <ryannedo...@gmail.com
> >
> > a
> > > > > écrit
> > > > > > :
> > > > > > >
> > > > > > > Makes sense!
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura <nny...@gmail.com>
> wrote:
> > > > > > >
> > > > > > > > Hey Ryanne,
> > > > > > > >
> > > > > > > > I see what you're saying about serde blocking, but I think we
> > > > should
> > > > > > > > consider it out of scope for this patch.  Right now we've
> > nailed
> > > > > down a
> > > > > > > > couple of use cases where we can unambiguously say, "I can
> make
> > > > > > progress
> > > > > > > > now" or "I cannot make progress now", which makes it possible
> > to
> > > > > > offload to
> > > > > > > > a different thread only if we are unable to make progress.
> > > > Extending
> > > > > > this
> > > > > > > > to CPU work like serde would mean always offloading, which
> > would
> > > > be a
> > > > > > > > really big performance change.  It might be worth exploring
> > > anyway,
> > > > > > but I'd
> > > > > > > > rather keep this patch focused on improving ergonomics,
> rather
> > > than
> > > > > > > > muddying the waters with evaluating performance very deeply.
> > > > > > > >
> > > > > > > > I think if we really do want to support serde or interceptors
> > > that
> > > > do
> > > > > > IO on
> > > > > > > > the send path (which seems like an anti-pattern to me), we
> > should
> > > > > > consider
> > > > > > > > making that a separate SIP, and probably also consider
> changing
> > > the
> > > > > > API to
> > > > > > > > use Futures (or CompletionStages).  But I would rather avoid
> > > scope
> > > > > > creep,
> > > > > > > > so that we have a better chance of fixing this part of the
> > > problem.
> > > > > > > >
> > > > > > > > Yes, I think some exceptions will move to being async instead
> > of
> > > > > sync.
> > > > > > > > They'll still be surfaced in the Future, so I'm not so
> > confident
> > > > that
> > > > > > it
> > > > > > > > would be that big a shock to users though.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Moses
> > > > > > > >
> > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan <
> > > > ryannedo...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > re serialization, my concern is that serialization often
> > > accounts
> > > > > > for a
> > > > > > > > lot
> > > > > > > > > of the cycles spent before returning the future. It's not
> > > > blocking
> > > > > > per
> > > > > > > > se,
> > > > > > > > > but it's the same effect from the caller's perspective.
> > > > > > > > >
> > > > > > > > > Moreover, serde impls often block themselves, e.g. when
> > > fetching
> > > > > > schemas
> > > > > > > > > from a registry. I suppose it's also possible to block in
> > > > > > Interceptors
> > > > > > > > > (e.g. writing audit events or metrics), which happens
> before
> > > > serdes
> > > > > > iiuc.
> > > > > > > > > So any blocking in either of those plugins would block the
> > send
> > > > > > unless we
> > > > > > > > > queue first.
> > > > > > > > >
> > > > > > > > > So I think we want to queue first and do everything
> > off-thread
> > > > when
> > > > > > using
> > > > > > > > > the new API, whatever that looks like. I just want to make
> > sure
> > > > we
> > > > > > don't
> > > > > > > > do
> > > > > > > > > that for clients that wouldn't expect it.
> > > > > > > > >
> > > > > > > > > Another consideration is exception handling. If we queue
> > right
> > > > > away,
> > > > > > > > we'll
> > > > > > > > > defer some exceptions that currently are thrown to the
> caller
> > > > > > (before the
> > > > > > > > > future is returned). In the new API, the send() wouldn't
> > throw
> > > > any
> > > > > > > > > exceptions, and instead the future would fail. I think that
> > > might
> > > > > > mean
> > > > > > > > that
> > > > > > > > > a new method signature is required.
> > > > > > > > >
> > > > > > > > > Ryanne
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura <
> > > nakamura.mo...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Ryanne,
> > > > > > > > > >
> > > > > > > > > > I agree we should add an additional constructor (or else
> an
> > > > > > additional
> > > > > > > > > > overload in KafkaProducer#send, but the new constructor
> > would
> > > > be
> > > > > > easier
> > > > > > > > > to
> > > > > > > > > > understand) if we're targeting the "user provides the
> > thread"
> > > > > > approach.
> > > > > > > > > >
> > > > > > > > > > From looking at the code, I think we can keep record
> > > > > serialization
> > > > > > on
> > > > > > > > the
> > > > > > > > > > user thread, if we consider that an important part of the
> > > > > > semantics of
> > > > > > > > > this
> > > > > > > > > > method.  It doesn't seem like serialization depends on
> > > knowing
> > > > > the
> > > > > > > > > cluster,
> > > > > > > > > > I think it's incidental that it comes after the first
> > > > "blocking"
> > > > > > > > segment
> > > > > > > > > in
> > > > > > > > > > the method.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Moses
> > > > > > > > > >
> > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan <
> > > > > > ryannedo...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Moses, I like the direction here. My thinking is
> > that a
> > > > > > single
> > > > > > > > > > > additional work queue, s.t. send() can enqueue and
> > return,
> > > > > seems
> > > > > > like
> > > > > > > > > the
> > > > > > > > > > > lightest touch. However, I don't think we can trivially
> > > > process
> > > > > > that
> > > > > > > > > > queue
> > > > > > > > > > > in an internal thread pool without subtly changing
> > behavior
> > > > for
> > > > > > some
> > > > > > > > > > users.
> > > > > > > > > > > For example, users will often run send() in multiple
> > > threads
> > > > in
> > > > > > order
> > > > > > > > > to
> > > > > > > > > > > serialize faster, but that wouldn't work quite the same
> > if
> > > > > there
> > > > > > were
> > > > > > > > > an
> > > > > > > > > > > internal thread pool.
> > > > > > > > > > >
> > > > > > > > > > > For this reason I'm thinking we need to make sure any
> > such
> > > > > > changes
> > > > > > > > are
> > > > > > > > > > > opt-in. Maybe a new constructor with an additional
> > > > > ThreadFactory
> > > > > > > > > > parameter.
> > > > > > > > > > > That would at least clearly indicate that work will
> > happen
> > > > > > > > off-thread,
> > > > > > > > > > and
> > > > > > > > > > > would require opt-in for the new behavior.
> > > > > > > > > > >
> > > > > > > > > > > Under the hood, this ThreadFactory could be used to
> > create
> > > > the
> > > > > > worker
> > > > > > > > > > > thread that process queued sends, which could fan-out
> to
> > > > > > > > per-partition
> > > > > > > > > > > threads from there.
> > > > > > > > > > >
> > > > > > > > > > > So then you'd have two ways to send: the existing way,
> > > where
> > > > > > serde
> > > > > > > > and
> > > > > > > > > > > interceptors and whatnot are executed on the calling
> > > thread,
> > > > > and
> > > > > > the
> > > > > > > > > new
> > > > > > > > > > > way, which returns right away and uses an internal
> > > Executor.
> > > > As
> > > > > > you
> > > > > > > > > point
> > > > > > > > > > > out, the semantics would be identical in either case,
> and
> > > it
> > > > > > would be
> > > > > > > > > > very
> > > > > > > > > > > easy for clients to switch.
> > > > > > > > > > >
> > > > > > > > > > > Ryanne
> > > > > > > > > > >
> > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura <
> nny...@gmail.com
> > >
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Folks,
> > > > > > > > > > > > I just posted a new proposal
> > > > > > > > > > > > <
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
> > > > > > > > > > > > >
> > > > > > > > > > > > in the wiki.  I think we have an opportunity to
> improve
> > > the
> > > > > > > > > > > > KafkaProducer#send user experience.  It would
> certainly
> > > > make
> > > > > > our
> > > > > > > > > lives
> > > > > > > > > > > > easier.  Please take a look!  There are two
> subproblems
> > > > that
> > > > > I
> > > > > > > > could
> > > > > > > > > > use
> > > > > > > > > > > > guidance on, so I would appreciate feedback on both
> of
> > > > them.
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Moses
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Matthew de Detrich
>
> *Aiven Deutschland GmbH*
>
> Immanuelkirchstraße 26, 10405 Berlin
>
> Amtsgericht Charlottenburg, HRB 209739 B
>
> *m:* +491603708037
>
> *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>

Reply via email to