Hi Ryanne,

Hmm, that's an interesting idea.  Basically it would mean that after
calling send, you would also have to check whether the returned future had
failed with a specific exception.  I would be open to it, although I think
it might be slightly more surprising, since right now the paradigm is
"enqueue synchronously, the future represents whether we succeeded in
sending or not" and the new one would be "enqueue synchronously, the future
either represents whether we succeeded in enqueueing or not (in which case
it will be failed immediately if it failed to enqueue) or whether we
succeeded in sending or not".

But you're right, it should be on the table, thank you for suggesting it!

Best,
Moses

On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:

> Moses, in the case of a full queue, could we just return a failed future
> immediately?
>
> Ryanne
>
> On Tue, May 18, 2021, 10:39 AM Nakamura <nny...@gmail.com> wrote:
>
> > Hi Alexandre,
> >
> > Thanks for bringing this up, I think I could use some feedback in this
> > area.  There are two mechanisms here, one for slowing down when we don't
> > have the relevant metadata, and the other for slowing down when a queue
> has
> > filled up.  Although the first one applies backpressure somewhat
> > inadvertently, we could still get in trouble if we're not providing
> > information to the mechanism that monitors whether we're queueing too
> > much.  As for the second one, that is a classic backpressure use case, so
> > it's definitely important that we don't drop that ability.
> >
> > Right now backpressure is applied by blocking, which is a natural way to
> > apply backpressure in synchronous systems, but can lead to unnecessary
> > slowdowns in asynchronous systems.  In my opinion, the safest way to
> apply
> > backpressure in an asynchronous model is to have an explicit backpressure
> > signal.  A good example would be returning an exception, and providing an
> > optional hook to add a callback onto so that you can be notified when
> it's
> > ready to accept more messages.
> >
> > However, this would be a really big change to how users use
> > KafkaProducer#send, so I don't know how much appetite we have for making
> > that kind of change.  Maybe it would be simpler to remove the "don't
> block
> > when the per-topic queue is full" from the scope of this KIP, and only
> > focus on when metadata is available?  The downside is that we will
> probably
> > want to change the API again later to fix this, so it might be better to
> > just rip the bandaid off now.
> >
> > One slightly nasty thing here is that because queueing order is
> important,
> > if we want to use exceptions, we will want to be able to signal the
> failure
> > to enqueue to the caller in such a way that they can still enforce
> message
> > order if they want.  So we can't embed the failure directly in the
> returned
> > future, we should either return two futures (nested, or as a tuple) or
> else
> > throw an exception to explain a backpressure.
> >
> > So there are a few things we should work out here:
> >
> > 1. Should we keep the "too many bytes enqueued" part of this in scope?
> (I
> > would say yes, so that we can minimize churn in this API)
> > 2. How should we signal backpressure so that it's appropriate for
> > asynchronous systems?  (I would say that we should throw an exception.
> If
> > we choose this and we want to pursue the queueing path, we would *not*
> want
> > to enqueue messages that would push us over the limit, and would only
> want
> > to enqueue messages when we're waiting for metadata, and we would want to
> > keep track of the total number of bytes for those messages).
> >
> > What do you think?
> >
> > Best,
> > Moses
> >
> > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> > > Hello Nakamura,
> > >
> > > Thanks for proposing this change. I can see how the blocking behaviour
> > > can be a problem when integrating with reactive frameworks such as
> > > Akka. One of the questions I would have is how you would handle back
> > > pressure and avoid memory exhaustion when the producer's buffer is
> > > full and tasks would start to accumulate in the out-of-band queue or
> > > thread pool introduced with this KIP.
> > >
> > > Thanks,
> > > Alexandre
> > >
> > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan <ryannedo...@gmail.com> a
> > écrit
> > > :
> > > >
> > > > Makes sense!
> > > >
> > > > Ryanne
> > > >
> > > > On Fri, May 14, 2021, 9:39 AM Nakamura <nny...@gmail.com> wrote:
> > > >
> > > > > Hey Ryanne,
> > > > >
> > > > > I see what you're saying about serde blocking, but I think we
> should
> > > > > consider it out of scope for this patch.  Right now we've nailed
> > down a
> > > > > couple of use cases where we can unambiguously say, "I can make
> > > progress
> > > > > now" or "I cannot make progress now", which makes it possible to
> > > offload to
> > > > > a different thread only if we are unable to make progress.
> Extending
> > > this
> > > > > to CPU work like serde would mean always offloading, which would
> be a
> > > > > really big performance change.  It might be worth exploring anyway,
> > > but I'd
> > > > > rather keep this patch focused on improving ergonomics, rather than
> > > > > muddying the waters with evaluating performance very deeply.
> > > > >
> > > > > I think if we really do want to support serde or interceptors that
> do
> > > IO on
> > > > > the send path (which seems like an anti-pattern to me), we should
> > > consider
> > > > > making that a separate SIP, and probably also consider changing the
> > > API to
> > > > > use Futures (or CompletionStages).  But I would rather avoid scope
> > > creep,
> > > > > so that we have a better chance of fixing this part of the problem.
> > > > >
> > > > > Yes, I think some exceptions will move to being async instead of
> > sync.
> > > > > They'll still be surfaced in the Future, so I'm not so confident
> that
> > > it
> > > > > would be that big a shock to users though.
> > > > >
> > > > > Best,
> > > > > Moses
> > > > >
> > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan <
> ryannedo...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > re serialization, my concern is that serialization often accounts
> > > for a
> > > > > lot
> > > > > > of the cycles spent before returning the future. It's not
> blocking
> > > per
> > > > > se,
> > > > > > but it's the same effect from the caller's perspective.
> > > > > >
> > > > > > Moreover, serde impls often block themselves, e.g. when fetching
> > > schemas
> > > > > > from a registry. I suppose it's also possible to block in
> > > Interceptors
> > > > > > (e.g. writing audit events or metrics), which happens before
> serdes
> > > iiuc.
> > > > > > So any blocking in either of those plugins would block the send
> > > unless we
> > > > > > queue first.
> > > > > >
> > > > > > So I think we want to queue first and do everything off-thread
> when
> > > using
> > > > > > the new API, whatever that looks like. I just want to make sure
> we
> > > don't
> > > > > do
> > > > > > that for clients that wouldn't expect it.
> > > > > >
> > > > > > Another consideration is exception handling. If we queue right
> > away,
> > > > > we'll
> > > > > > defer some exceptions that currently are thrown to the caller
> > > (before the
> > > > > > future is returned). In the new API, the send() wouldn't throw
> any
> > > > > > exceptions, and instead the future would fail. I think that might
> > > mean
> > > > > that
> > > > > > a new method signature is required.
> > > > > >
> > > > > > Ryanne
> > > > > >
> > > > > >
> > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura <nakamura.mo...@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > > Hey Ryanne,
> > > > > > >
> > > > > > > I agree we should add an additional constructor (or else an
> > > additional
> > > > > > > overload in KafkaProducer#send, but the new constructor would
> be
> > > easier
> > > > > > to
> > > > > > > understand) if we're targeting the "user provides the thread"
> > > approach.
> > > > > > >
> > > > > > > From looking at the code, I think we can keep record
> > serialization
> > > on
> > > > > the
> > > > > > > user thread, if we consider that an important part of the
> > > semantics of
> > > > > > this
> > > > > > > method.  It doesn't seem like serialization depends on knowing
> > the
> > > > > > cluster,
> > > > > > > I think it's incidental that it comes after the first
> "blocking"
> > > > > segment
> > > > > > in
> > > > > > > the method.
> > > > > > >
> > > > > > > Best,
> > > > > > > Moses
> > > > > > >
> > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan <
> > > ryannedo...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Moses, I like the direction here. My thinking is that a
> > > single
> > > > > > > > additional work queue, s.t. send() can enqueue and return,
> > seems
> > > like
> > > > > > the
> > > > > > > > lightest touch. However, I don't think we can trivially
> process
> > > that
> > > > > > > queue
> > > > > > > > in an internal thread pool without subtly changing behavior
> for
> > > some
> > > > > > > users.
> > > > > > > > For example, users will often run send() in multiple threads
> in
> > > order
> > > > > > to
> > > > > > > > serialize faster, but that wouldn't work quite the same if
> > there
> > > were
> > > > > > an
> > > > > > > > internal thread pool.
> > > > > > > >
> > > > > > > > For this reason I'm thinking we need to make sure any such
> > > changes
> > > > > are
> > > > > > > > opt-in. Maybe a new constructor with an additional
> > ThreadFactory
> > > > > > > parameter.
> > > > > > > > That would at least clearly indicate that work will happen
> > > > > off-thread,
> > > > > > > and
> > > > > > > > would require opt-in for the new behavior.
> > > > > > > >
> > > > > > > > Under the hood, this ThreadFactory could be used to create
> the
> > > worker
> > > > > > > > thread that process queued sends, which could fan-out to
> > > > > per-partition
> > > > > > > > threads from there.
> > > > > > > >
> > > > > > > > So then you'd have two ways to send: the existing way, where
> > > serde
> > > > > and
> > > > > > > > interceptors and whatnot are executed on the calling thread,
> > and
> > > the
> > > > > > new
> > > > > > > > way, which returns right away and uses an internal Executor.
> As
> > > you
> > > > > > point
> > > > > > > > out, the semantics would be identical in either case, and it
> > > would be
> > > > > > > very
> > > > > > > > easy for clients to switch.
> > > > > > > >
> > > > > > > > Ryanne
> > > > > > > >
> > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura <nny...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > > Hey Folks,
> > > > > > > > > I just posted a new proposal
> > > > > > > > > <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
> > > > > > > > > >
> > > > > > > > > in the wiki.  I think we have an opportunity to improve the
> > > > > > > > > KafkaProducer#send user experience.  It would certainly
> make
> > > our
> > > > > > lives
> > > > > > > > > easier.  Please take a look!  There are two subproblems
> that
> > I
> > > > > could
> > > > > > > use
> > > > > > > > > guidance on, so I would appreciate feedback on both of
> them.
> > > > > > > > > Best,
> > > > > > > > > Moses
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
>

Reply via email to