Josep, that is being done as KIP-707. Looking forward to that as well :) Ryanne
On Wed, May 26, 2021, 9:08 AM Josep Prat <josep.p...@aiven.io.invalid> wrote: > Sorry, I meant `CompletionStage` (instead of CompletableFuture) as this is > the interface. > > Best, > ——— > Josep Prat > > Aiven Deutschland GmbH > > Immanuelkirchstraße 26, 10405 Berlin > > Amtsgericht Charlottenburg, HRB 209739 B > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > m: +491715557497 > > w: aiven.io > > e: josep.p...@aiven.io > > On Wed, May 26, 2021, 15:36 Josep Prat <josep.p...@aiven.io> wrote: > > > Hi, > > If I may, I would like to suggest that instead of using Java's `Future` > > class on the API's, it would be better to use `CompletableFuture`. This > > would offer the possibility of applying callbacks on its completion for > > example. > > > > Best, > > > > On Wed, May 26, 2021 at 3:28 PM Matthew de Detrich > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > >> Maybe there was a miscommunication but I agree with everything you > said, I > >> was just clarifying what my definition of blocking is (because I think > >> there was a misunderstanding). > >> > >> And yes you are right, there is a limited amount of threads which is why > >> blocking is a bad thing because having threads sitting around > waiting/not > >> doing anything is a waste of resources but ultimately this is also a > >> performance problem because if you don't block you can simply process > more > >> IO tasks on a given machine/instance (hence greater performance). > >> > >> In any case, as is clarified the current behavior of send() needs to be > >> fixed. It's returning a Future but since it's internally blocking and > >> using > >> the caller's thread from an API perspective it gives the incorrect > >> impression that it's asynchronous (when it's not). > >> > >> On Wed, May 26, 2021 at 3:15 PM Ryanne Dolan <ryannedo...@gmail.com> > >> wrote: > >> > >> > Matthew, it's more than performance tho. In many frameworks the number > >> of > >> > request threads is purposefully constrained, and blocking one means > you > >> > have one less to handle requests with. When you're handling a large > >> amount > >> > of requests with a small number of threads, any blocking can lead to > >> thread > >> > exhaustion. > >> > > >> > For this reason, you'll often see send() wrapped in a future or thread > >> > pool. But it's surprising that this would be required, since send() > >> already > >> > returns a future. > >> > > >> > Additionally, even when send() does not actually block, it does a lot > of > >> > work on the caller's thread, which is likewise surprising given a > >> future is > >> > returned. The effect is the same: less threads are available to handle > >> > requests, and you risk thread exhaustion. > >> > > >> > I think we may incidentally improve performance if we introduce an > >> internal > >> > thread pool, but the primary motivation here, at least for me, is to > fix > >> > the lie the API is telling, not to improve performance. > >> > > >> > Ryanne > >> > > >> > > >> > > >> > On Wed, May 26, 2021, 6:51 AM Matthew de Detrich > >> > <matthew.dedetr...@aiven.io.invalid> wrote: > >> > > >> > > I think we may need to clarify terminology here, at least to me > >> blocking > >> > > means suspending a current thread to wait for some operation (which > is > >> > > wasteful if we are dealing with IO bound tasks). In other words, the > >> > > "blocking" is an implementation detail on how to wait rather than > >> whether > >> > > we need to wait or not, so to me this is more of a performance > >> question. > >> > > > >> > > In the scenario you describe of kafka clients producing too many > >> > messages, > >> > > as you said buffering is what should be done but I wouldn't classify > >> this > >> > > as blocking. > >> > > > >> > > On Mon, May 24, 2021 at 7:54 PM Colin McCabe <cmcc...@apache.org> > >> wrote: > >> > > > >> > > > Hi all, > >> > > > > >> > > > I agree that we should give users the option of having a fully > async > >> > API, > >> > > > but I don't think external thread pools or queues are the right > >> > direction > >> > > > to go here. They add performance overheads and don't address the > >> root > >> > > > causes of the problem. > >> > > > > >> > > > There are basically two scenarios where we block, currently. One > is > >> > when > >> > > > we are doing a metadata fetch. I think this is clearly a bug, or > at > >> > least > >> > > > an implementation limitation. From the user's point of view, the > >> fact > >> > > that > >> > > > we are doing a metadata fetch is an implementation detail that > >> really > >> > > > shouldn't be exposed like this. We have talked about fixing this > in > >> the > >> > > > past. I think we just should spend the time to do it. > >> > > > > >> > > > The second scenario is where the client has produced too much data > >> in > >> > too > >> > > > little time. This could happen if there is a network glitch, or > the > >> > > server > >> > > > is slower than expected. In this case, the behavior is intentional > >> and > >> > > not > >> > > > a bug. To understand this, think about what would happen if we > >> didn't > >> > > > block. We would start buffering more and more data in memory, > until > >> > > finally > >> > > > the application died with an out of memory error. That would be > >> > > frustrating > >> > > > for users and wouldn't add to the usability of Kafka. > >> > > > > >> > > > We could potentially have an option to handle the out-of-memory > >> > scenario > >> > > > differently by returning an error code immediately rather than > >> > blocking. > >> > > > Applications would have to be rewritten to handle this properly, > >> but it > >> > > is > >> > > > a possibility. I suspect that most of them wouldn't use this, but > we > >> > > could > >> > > > offer it as a possibility for async purists (which might include > >> > certain > >> > > > frameworks). The big problem the users would have to solve is what > >> to > >> > do > >> > > > with the record that they were unable to produce due to the buffer > >> full > >> > > > issue. > >> > > > > >> > > > best, > >> > > > Colin > >> > > > > >> > > > > >> > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: > >> > > > > > > >> > > > > > My suggestion was just do this in multiple steps/phases, > firstly > >> > > let's > >> > > > fix > >> > > > > > the issue of send being misleadingly asynchronous (i.e. > >> internally > >> > > its > >> > > > > > blocking) and then later one we can make the various > >> > > > > > threadpools configurable with a sane default. > >> > > > > > >> > > > > I like that approach. I updated the "Which thread should be > >> > responsible > >> > > > for > >> > > > > waiting" part of KIP-739 to add your suggestion as my > recommended > >> > > > approach, > >> > > > > thank you! If no one else has major concerns about that > approach, > >> > I'll > >> > > > > move the alternatives to "rejected alternatives". > >> > > > > > >> > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich > >> > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > >> > > > > > >> > > > > > @ > >> > > > > > > >> > > > > > Nakamura > >> > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com> > >> wrote: > >> > > > > > > >> > > > > > > @Ryanne: > >> > > > > > > In my mind's eye I slightly prefer the throwing the "cannot > >> > > enqueue" > >> > > > > > > exception to satisfying the future immediately with the > >> "cannot > >> > > > enqueue" > >> > > > > > > exception? But I agree, it would be worth doing more > >> research. > >> > > > > > > > >> > > > > > > @Matthew: > >> > > > > > > > >> > > > > > > > 3. Using multiple thread pools is definitely recommended > for > >> > > > different > >> > > > > > > > types of tasks, for serialization which is CPU bound you > >> > > definitely > >> > > > > > would > >> > > > > > > > want to use a bounded thread pool that is fixed by the > >> number > >> > of > >> > > > CPU's > >> > > > > > > (or > >> > > > > > > > something along those lines). > >> > > > > > > > > >> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > >> > > > is > >> > > > > > a > >> > > > > > > > very good guide on this topic > >> > > > > > > I think this guide is good in general, but I would be > >> hesitant to > >> > > > follow > >> > > > > > > its guidance re: offloading serialization without > benchmarking > >> > it. > >> > > > My > >> > > > > > > understanding is that context-switches have gotten much > >> cheaper, > >> > > and > >> > > > that > >> > > > > > > gains from cache locality are small, but they're not > nothing. > >> > > > Especially > >> > > > > > > if the workload has a very small serialization cost, I > >> wouldn't > >> > be > >> > > > > > shocked > >> > > > > > > if it made it slower. I feel pretty strongly that we should > >> do > >> > > more > >> > > > > > > research here before unconditionally encouraging > serialization > >> > in a > >> > > > > > > threadpool. If people think it's important to do it here > (eg > >> if > >> > we > >> > > > think > >> > > > > > > it would mean another big API change) then we should start > >> > thinking > >> > > > about > >> > > > > > > what benchmarking we can do to gain higher confidence in > this > >> > kind > >> > > of > >> > > > > > > change. However, I don't think it would change semantics as > >> > > > > > substantially > >> > > > > > > as we're proposing here, so I would vote for pushing this > to a > >> > > > subsequent > >> > > > > > > KIP. > >> > > > > > > > >> > > > > > Of course, its all down to benchmarking, benchmarking and > >> > > benchmarking. > >> > > > > > Ideally speaking you want to use all of the resources > available > >> to > >> > > > you, so > >> > > > > > if you have a bottleneck in serialization and you have many > >> cores > >> > > free > >> > > > then > >> > > > > > using multiple cores may be more appropriate than a single > core. > >> > > > Typically > >> > > > > > I would expect that using a single thread to do serialization > is > >> > > > likely to > >> > > > > > be the most situation, I was just responding to an earlier > point > >> > that > >> > > > was > >> > > > > > made in regards to using ThreadPools for serialization (note > >> that > >> > you > >> > > > can > >> > > > > > also just use a ThreadPool that is pinned to a single thread) > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > 4. Regarding providing the ability for users to supply > their > >> > own > >> > > > custom > >> > > > > > > > ThreadPool this is more of an ergonomics question for the > >> API. > >> > > > > > Especially > >> > > > > > > > when it gets to monitoring/tracing, giving the ability for > >> > users > >> > > to > >> > > > > > > provide > >> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as > >> stated > >> > > > doing so > >> > > > > > > > means a lot of boilerplatery changes to the API. Typically > >> > > > speaking a > >> > > > > > lot > >> > > > > > > > of monitoring/tracing/diagnosing is done on > >> > > > > > ExecutionContext/ThreadPools > >> > > > > > > > (at least on a more rudimentary level) and hence allowing > >> users > >> > > to > >> > > > > > supply > >> > > > > > > a > >> > > > > > > > global singleton ThreadPool for IO tasks and another for > CPU > >> > > tasks > >> > > > > > makes > >> > > > > > > > their lives a lot easier. However due to the large amount > of > >> > > > changes to > >> > > > > > > the > >> > > > > > > > API, it may be more appropriate to just use internal > thread > >> > pools > >> > > > (for > >> > > > > > > now) > >> > > > > > > > since at least it's not any worse than what exists > currently > >> > and > >> > > > this > >> > > > > > can > >> > > > > > > > be an improvement that is done later? > >> > > > > > > Is there an existing threadpool that you suggest we reuse? > Or > >> > are > >> > > > you > >> > > > > > > imagining that we make our own internal threadpool, and then > >> > maybe > >> > > > expose > >> > > > > > > configuration flags to manipulate it? For what it's worth, > I > >> > like > >> > > > having > >> > > > > > > an internal threadpool (perhaps just FJP.commonpool) and > then > >> > > > providing > >> > > > > > an > >> > > > > > > alternative to pass your own threadpool. That way people > who > >> > want > >> > > > finer > >> > > > > > > control can get it, and everyone else can do OK with the > >> default. > >> > > > > > > > >> > > > > > Indeed that is what I am saying. The most ideal situation is > >> that > >> > > > there is > >> > > > > > a default internal threadpool that Kafka uses, however users > of > >> > Kafka > >> > > > can > >> > > > > > configure there own threadpool. Having a singleton ThreadPool > >> for > >> > > > blocking > >> > > > > > IO, non blocking IO and CPU bound tasks which can be plugged > in > >> all > >> > > of > >> > > > your > >> > > > > > libraries (including Kafka) makes resource management much > >> easier > >> > to > >> > > > do and > >> > > > > > also gives control of users to override specific threadpools > for > >> > > > > > exceptional cases (i.e. providing a Threadpool that is pinned > >> to a > >> > > > single > >> > > > > > core which tends to give the best latency results if this is > >> > > something > >> > > > that > >> > > > > > is critical for you). > >> > > > > > > >> > > > > > My suggestion was just do this in multiple steps/phases, > firstly > >> > > let's > >> > > > fix > >> > > > > > the issue of send being misleadingly asynchronous (i.e. > >> internally > >> > > its > >> > > > > > blocking) and then later one we can make the various > >> > > > > > threadpools configurable with a sane default. > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich > >> > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > >> > > > > > > > >> > > > > > > > Here are my two cents here (note that I am only seeing > this > >> on > >> > a > >> > > > > > surface > >> > > > > > > > level) > >> > > > > > > > > >> > > > > > > > 1. If we are going this road it makes sense to do this > >> > "properly" > >> > > > (i.e. > >> > > > > > > > using queues as Ryan suggested). The reason I am saying > >> this > >> > is > >> > > > that > >> > > > > > it > >> > > > > > > > seems that the original goal of the KIP is for it to be > >> used in > >> > > > other > >> > > > > > > > asynchronous systems and from my personal experience, you > >> > really > >> > > do > >> > > > > > need > >> > > > > > > to > >> > > > > > > > make the implementation properly asynchronous otherwise > it's > >> > > > really not > >> > > > > > > > that useful. > >> > > > > > > > 2. Due to the previous point and what was said by others, > >> this > >> > is > >> > > > > > likely > >> > > > > > > > going to break some existing semantics (i.e. people are > >> > currently > >> > > > > > relying > >> > > > > > > > on blocking semantics) so adding another > method's/interface > >> > plus > >> > > > > > > > deprecating the older one is more annoying but ideal. > >> > > > > > > > 3. Using multiple thread pools is definitely recommended > for > >> > > > different > >> > > > > > > > types of tasks, for serialization which is CPU bound you > >> > > definitely > >> > > > > > would > >> > > > > > > > want to use a bounded thread pool that is fixed by the > >> number > >> > of > >> > > > CPU's > >> > > > > > > (or > >> > > > > > > > something along those lines). > >> > > > > > > > > >> > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > >> > > > is > >> > > > > > a > >> > > > > > > > very good guide on this topic > >> > > > > > > > 4. Regarding providing the ability for users to supply > their > >> > own > >> > > > custom > >> > > > > > > > ThreadPool this is more of an ergonomics question for the > >> API. > >> > > > > > Especially > >> > > > > > > > when it gets to monitoring/tracing, giving the ability for > >> > users > >> > > to > >> > > > > > > provide > >> > > > > > > > their own custom IO/CPU ThreadPools is ideal however as > >> stated > >> > > > doing so > >> > > > > > > > means a lot of boilerplatery changes to the API. Typically > >> > > > speaking a > >> > > > > > lot > >> > > > > > > > of monitoring/tracing/diagnosing is done on > >> > > > > > ExecutionContext/ThreadPools > >> > > > > > > > (at least on a more rudimentary level) and hence allowing > >> users > >> > > to > >> > > > > > > supply a > >> > > > > > > > global singleton ThreadPool for IO tasks and another for > CPU > >> > > tasks > >> > > > > > makes > >> > > > > > > > their lives a lot easier. However due to the large amount > of > >> > > > changes to > >> > > > > > > the > >> > > > > > > > API, it may be more appropriate to just use internal > thread > >> > pools > >> > > > (for > >> > > > > > > now) > >> > > > > > > > since at least it's not any worse than what exists > currently > >> > and > >> > > > this > >> > > > > > can > >> > > > > > > > be an improvement that is done later? > >> > > > > > > > > >> > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < > >> > > > ryannedo...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > I was thinking the sender would typically wrap send() > in a > >> > > > > > > backoff/retry > >> > > > > > > > > loop, or else ignore any failures and drop sends on the > >> floor > >> > > > > > > > > (fire-and-forget), and in both cases I think failing > >> > > immediately > >> > > > is > >> > > > > > > > better > >> > > > > > > > > than blocking for a new spot in the queue or > >> asynchronously > >> > > > failing > >> > > > > > > > > somehow. > >> > > > > > > > > > >> > > > > > > > > I think a failed future is adequate for the "explicit > >> > > > backpressure > >> > > > > > > > signal" > >> > > > > > > > > while avoiding any blocking anywhere. I think if we try > to > >> > > > > > > asynchronously > >> > > > > > > > > signal the caller of failure (either by asynchronously > >> > failing > >> > > > the > >> > > > > > > future > >> > > > > > > > > or invoking a callback off-thread or something) we'd > force > >> > the > >> > > > caller > >> > > > > > > to > >> > > > > > > > > either block or poll waiting for that signal, which > >> somewhat > >> > > > defeats > >> > > > > > > the > >> > > > > > > > > purpose we're after. And of course blocking for a spot > in > >> the > >> > > > queue > >> > > > > > > > > definitely defeats the purpose (tho perhaps ameliorates > >> the > >> > > > problem > >> > > > > > > > some). > >> > > > > > > > > > >> > > > > > > > > Throwing an exception to the caller directly (not via > the > >> > > > future) is > >> > > > > > > > > another option with precedent in Kafka clients, tho it > >> > doesn't > >> > > > seem > >> > > > > > as > >> > > > > > > > > ergonomic to me. > >> > > > > > > > > > >> > > > > > > > > It would be interesting to analyze some existing usage > and > >> > > > determine > >> > > > > > > how > >> > > > > > > > > difficult it would be to convert it to the various > >> proposed > >> > > APIs. > >> > > > > > > > > > >> > > > > > > > > Ryanne > >> > > > > > > > > > >> > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura < > nny...@gmail.com> > >> > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Ryanne, > >> > > > > > > > > > > >> > > > > > > > > > Hmm, that's an interesting idea. Basically it would > >> mean > >> > > that > >> > > > > > after > >> > > > > > > > > > calling send, you would also have to check whether the > >> > > returned > >> > > > > > > future > >> > > > > > > > > had > >> > > > > > > > > > failed with a specific exception. I would be open to > >> it, > >> > > > although > >> > > > > > I > >> > > > > > > > > think > >> > > > > > > > > > it might be slightly more surprising, since right now > >> the > >> > > > paradigm > >> > > > > > is > >> > > > > > > > > > "enqueue synchronously, the future represents whether > we > >> > > > succeeded > >> > > > > > in > >> > > > > > > > > > sending or not" and the new one would be "enqueue > >> > > > synchronously, > >> > > > > > the > >> > > > > > > > > future > >> > > > > > > > > > either represents whether we succeeded in enqueueing > or > >> not > >> > > (in > >> > > > > > which > >> > > > > > > > > case > >> > > > > > > > > > it will be failed immediately if it failed to enqueue) > >> or > >> > > > whether > >> > > > > > we > >> > > > > > > > > > succeeded in sending or not". > >> > > > > > > > > > > >> > > > > > > > > > But you're right, it should be on the table, thank you > >> for > >> > > > > > suggesting > >> > > > > > > > it! > >> > > > > > > > > > > >> > > > > > > > > > Best, > >> > > > > > > > > > Moses > >> > > > > > > > > > > >> > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < > >> > > > > > ryannedo...@gmail.com > >> > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Moses, in the case of a full queue, could we just > >> return > >> > a > >> > > > failed > >> > > > > > > > > future > >> > > > > > > > > > > immediately? > >> > > > > > > > > > > > >> > > > > > > > > > > Ryanne > >> > > > > > > > > > > > >> > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura < > >> > nny...@gmail.com> > >> > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Hi Alexandre, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks for bringing this up, I think I could use > >> some > >> > > > feedback > >> > > > > > in > >> > > > > > > > > this > >> > > > > > > > > > > > area. There are two mechanisms here, one for > >> slowing > >> > > down > >> > > > when > >> > > > > > > we > >> > > > > > > > > > don't > >> > > > > > > > > > > > have the relevant metadata, and the other for > >> slowing > >> > > down > >> > > > > > when a > >> > > > > > > > > queue > >> > > > > > > > > > > has > >> > > > > > > > > > > > filled up. Although the first one applies > >> backpressure > >> > > > > > somewhat > >> > > > > > > > > > > > inadvertently, we could still get in trouble if > >> we're > >> > not > >> > > > > > > providing > >> > > > > > > > > > > > information to the mechanism that monitors whether > >> > we're > >> > > > > > queueing > >> > > > > > > > too > >> > > > > > > > > > > > much. As for the second one, that is a classic > >> > > > backpressure > >> > > > > > use > >> > > > > > > > > case, > >> > > > > > > > > > so > >> > > > > > > > > > > > it's definitely important that we don't drop that > >> > > ability. > >> > > > > > > > > > > > > >> > > > > > > > > > > > Right now backpressure is applied by blocking, > which > >> > is a > >> > > > > > natural > >> > > > > > > > way > >> > > > > > > > > > to > >> > > > > > > > > > > > apply backpressure in synchronous systems, but can > >> lead > >> > > to > >> > > > > > > > > unnecessary > >> > > > > > > > > > > > slowdowns in asynchronous systems. In my opinion, > >> the > >> > > > safest > >> > > > > > way > >> > > > > > > > to > >> > > > > > > > > > > apply > >> > > > > > > > > > > > backpressure in an asynchronous model is to have > an > >> > > > explicit > >> > > > > > > > > > backpressure > >> > > > > > > > > > > > signal. A good example would be returning an > >> > exception, > >> > > > and > >> > > > > > > > > providing > >> > > > > > > > > > an > >> > > > > > > > > > > > optional hook to add a callback onto so that you > >> can be > >> > > > > > notified > >> > > > > > > > when > >> > > > > > > > > > > it's > >> > > > > > > > > > > > ready to accept more messages. > >> > > > > > > > > > > > > >> > > > > > > > > > > > However, this would be a really big change to how > >> users > >> > > use > >> > > > > > > > > > > > KafkaProducer#send, so I don't know how much > >> appetite > >> > we > >> > > > have > >> > > > > > for > >> > > > > > > > > > making > >> > > > > > > > > > > > that kind of change. Maybe it would be simpler to > >> > remove > >> > > > the > >> > > > > > > > "don't > >> > > > > > > > > > > block > >> > > > > > > > > > > > when the per-topic queue is full" from the scope > of > >> > this > >> > > > KIP, > >> > > > > > and > >> > > > > > > > > only > >> > > > > > > > > > > > focus on when metadata is available? The downside > >> is > >> > > that > >> > > > we > >> > > > > > > will > >> > > > > > > > > > > probably > >> > > > > > > > > > > > want to change the API again later to fix this, so > >> it > >> > > > might be > >> > > > > > > > better > >> > > > > > > > > > to > >> > > > > > > > > > > > just rip the bandaid off now. > >> > > > > > > > > > > > > >> > > > > > > > > > > > One slightly nasty thing here is that because > >> queueing > >> > > > order is > >> > > > > > > > > > > important, > >> > > > > > > > > > > > if we want to use exceptions, we will want to be > >> able > >> > to > >> > > > signal > >> > > > > > > the > >> > > > > > > > > > > failure > >> > > > > > > > > > > > to enqueue to the caller in such a way that they > can > >> > > still > >> > > > > > > enforce > >> > > > > > > > > > > message > >> > > > > > > > > > > > order if they want. So we can't embed the failure > >> > > > directly in > >> > > > > > > the > >> > > > > > > > > > > returned > >> > > > > > > > > > > > future, we should either return two futures > >> (nested, or > >> > > as > >> > > > a > >> > > > > > > tuple) > >> > > > > > > > > or > >> > > > > > > > > > > else > >> > > > > > > > > > > > throw an exception to explain a backpressure. > >> > > > > > > > > > > > > >> > > > > > > > > > > > So there are a few things we should work out here: > >> > > > > > > > > > > > > >> > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued" > >> part of > >> > > > this in > >> > > > > > > > > scope? > >> > > > > > > > > > > (I > >> > > > > > > > > > > > would say yes, so that we can minimize churn in > this > >> > API) > >> > > > > > > > > > > > 2. How should we signal backpressure so that it's > >> > > > appropriate > >> > > > > > for > >> > > > > > > > > > > > asynchronous systems? (I would say that we should > >> > throw > >> > > an > >> > > > > > > > > exception. > >> > > > > > > > > > > If > >> > > > > > > > > > > > we choose this and we want to pursue the queueing > >> path, > >> > > we > >> > > > > > would > >> > > > > > > > > *not* > >> > > > > > > > > > > want > >> > > > > > > > > > > > to enqueue messages that would push us over the > >> limit, > >> > > and > >> > > > > > would > >> > > > > > > > only > >> > > > > > > > > > > want > >> > > > > > > > > > > > to enqueue messages when we're waiting for > metadata, > >> > and > >> > > we > >> > > > > > would > >> > > > > > > > > want > >> > > > > > > > > > to > >> > > > > > > > > > > > keep track of the total number of bytes for those > >> > > > messages). > >> > > > > > > > > > > > > >> > > > > > > > > > > > What do you think? > >> > > > > > > > > > > > > >> > > > > > > > > > > > Best, > >> > > > > > > > > > > > Moses > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez > < > >> > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hello Nakamura, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for proposing this change. I can see how > >> the > >> > > > blocking > >> > > > > > > > > > behaviour > >> > > > > > > > > > > > > can be a problem when integrating with reactive > >> > > > frameworks > >> > > > > > such > >> > > > > > > > as > >> > > > > > > > > > > > > Akka. One of the questions I would have is how > you > >> > > would > >> > > > > > handle > >> > > > > > > > > back > >> > > > > > > > > > > > > pressure and avoid memory exhaustion when the > >> > > producer's > >> > > > > > buffer > >> > > > > > > > is > >> > > > > > > > > > > > > full and tasks would start to accumulate in the > >> > > > out-of-band > >> > > > > > > queue > >> > > > > > > > > or > >> > > > > > > > > > > > > thread pool introduced with this KIP. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > Alexandre > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < > >> > > > > > > ryannedo...@gmail.com > >> > > > > > > > > > >> > > > > > > > > a > >> > > > > > > > > > > > écrit > >> > > > > > > > > > > > > : > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Makes sense! > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Ryanne > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < > >> > > > nny...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Ryanne, > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I see what you're saying about serde > blocking, > >> > but > >> > > I > >> > > > > > think > >> > > > > > > we > >> > > > > > > > > > > should > >> > > > > > > > > > > > > > > consider it out of scope for this patch. > >> Right > >> > now > >> > > > we've > >> > > > > > > > > nailed > >> > > > > > > > > > > > down a > >> > > > > > > > > > > > > > > couple of use cases where we can > unambiguously > >> > say, > >> > > > "I > >> > > > > > can > >> > > > > > > > make > >> > > > > > > > > > > > > progress > >> > > > > > > > > > > > > > > now" or "I cannot make progress now", which > >> makes > >> > > it > >> > > > > > > possible > >> > > > > > > > > to > >> > > > > > > > > > > > > offload to > >> > > > > > > > > > > > > > > a different thread only if we are unable to > >> make > >> > > > > > progress. > >> > > > > > > > > > > Extending > >> > > > > > > > > > > > > this > >> > > > > > > > > > > > > > > to CPU work like serde would mean always > >> > > offloading, > >> > > > > > which > >> > > > > > > > > would > >> > > > > > > > > > > be a > >> > > > > > > > > > > > > > > really big performance change. It might be > >> worth > >> > > > > > exploring > >> > > > > > > > > > anyway, > >> > > > > > > > > > > > > but I'd > >> > > > > > > > > > > > > > > rather keep this patch focused on improving > >> > > > ergonomics, > >> > > > > > > > rather > >> > > > > > > > > > than > >> > > > > > > > > > > > > > > muddying the waters with evaluating > >> performance > >> > > very > >> > > > > > > deeply. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > I think if we really do want to support > serde > >> or > >> > > > > > > interceptors > >> > > > > > > > > > that > >> > > > > > > > > > > do > >> > > > > > > > > > > > > IO on > >> > > > > > > > > > > > > > > the send path (which seems like an > >> anti-pattern > >> > to > >> > > > me), > >> > > > > > we > >> > > > > > > > > should > >> > > > > > > > > > > > > consider > >> > > > > > > > > > > > > > > making that a separate SIP, and probably > also > >> > > > consider > >> > > > > > > > changing > >> > > > > > > > > > the > >> > > > > > > > > > > > > API to > >> > > > > > > > > > > > > > > use Futures (or CompletionStages). But I > >> would > >> > > > rather > >> > > > > > > avoid > >> > > > > > > > > > scope > >> > > > > > > > > > > > > creep, > >> > > > > > > > > > > > > > > so that we have a better chance of fixing > this > >> > part > >> > > > of > >> > > > > > the > >> > > > > > > > > > problem. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, I think some exceptions will move to > >> being > >> > > async > >> > > > > > > instead > >> > > > > > > > > of > >> > > > > > > > > > > > sync. > >> > > > > > > > > > > > > > > They'll still be surfaced in the Future, so > >> I'm > >> > not > >> > > > so > >> > > > > > > > > confident > >> > > > > > > > > > > that > >> > > > > > > > > > > > > it > >> > > > > > > > > > > > > > > would be that big a shock to users though. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Best, > >> > > > > > > > > > > > > > > Moses > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne > Dolan < > >> > > > > > > > > > > ryannedo...@gmail.com> > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > re serialization, my concern is that > >> > > serialization > >> > > > > > often > >> > > > > > > > > > accounts > >> > > > > > > > > > > > > for a > >> > > > > > > > > > > > > > > lot > >> > > > > > > > > > > > > > > > of the cycles spent before returning the > >> > future. > >> > > > It's > >> > > > > > not > >> > > > > > > > > > > blocking > >> > > > > > > > > > > > > per > >> > > > > > > > > > > > > > > se, > >> > > > > > > > > > > > > > > > but it's the same effect from the caller's > >> > > > perspective. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Moreover, serde impls often block > >> themselves, > >> > > e.g. > >> > > > when > >> > > > > > > > > > fetching > >> > > > > > > > > > > > > schemas > >> > > > > > > > > > > > > > > > from a registry. I suppose it's also > >> possible > >> > to > >> > > > block > >> > > > > > in > >> > > > > > > > > > > > > Interceptors > >> > > > > > > > > > > > > > > > (e.g. writing audit events or metrics), > >> which > >> > > > happens > >> > > > > > > > before > >> > > > > > > > > > > serdes > >> > > > > > > > > > > > > iiuc. > >> > > > > > > > > > > > > > > > So any blocking in either of those plugins > >> > would > >> > > > block > >> > > > > > > the > >> > > > > > > > > send > >> > > > > > > > > > > > > unless we > >> > > > > > > > > > > > > > > > queue first. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > So I think we want to queue first and do > >> > > everything > >> > > > > > > > > off-thread > >> > > > > > > > > > > when > >> > > > > > > > > > > > > using > >> > > > > > > > > > > > > > > > the new API, whatever that looks like. I > >> just > >> > > want > >> > > > to > >> > > > > > > make > >> > > > > > > > > sure > >> > > > > > > > > > > we > >> > > > > > > > > > > > > don't > >> > > > > > > > > > > > > > > do > >> > > > > > > > > > > > > > > > that for clients that wouldn't expect it. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Another consideration is exception > >> handling. If > >> > > we > >> > > > > > queue > >> > > > > > > > > right > >> > > > > > > > > > > > away, > >> > > > > > > > > > > > > > > we'll > >> > > > > > > > > > > > > > > > defer some exceptions that currently are > >> thrown > >> > > to > >> > > > the > >> > > > > > > > caller > >> > > > > > > > > > > > > (before the > >> > > > > > > > > > > > > > > > future is returned). In the new API, the > >> send() > >> > > > > > wouldn't > >> > > > > > > > > throw > >> > > > > > > > > > > any > >> > > > > > > > > > > > > > > > exceptions, and instead the future would > >> fail. > >> > I > >> > > > think > >> > > > > > > that > >> > > > > > > > > > might > >> > > > > > > > > > > > > mean > >> > > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > a new method signature is required. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Ryanne > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura < > >> > > > > > > > > > nakamura.mo...@gmail.com > >> > > > > > > > > > > > > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hey Ryanne, > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > I agree we should add an additional > >> > constructor > >> > > > (or > >> > > > > > > else > >> > > > > > > > an > >> > > > > > > > > > > > > additional > >> > > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the > >> new > >> > > > > > constructor > >> > > > > > > > > would > >> > > > > > > > > > > be > >> > > > > > > > > > > > > easier > >> > > > > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > > understand) if we're targeting the "user > >> > > > provides the > >> > > > > > > > > thread" > >> > > > > > > > > > > > > approach. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > From looking at the code, I think we can > >> keep > >> > > > record > >> > > > > > > > > > > > serialization > >> > > > > > > > > > > > > on > >> > > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > user thread, if we consider that an > >> important > >> > > > part of > >> > > > > > > the > >> > > > > > > > > > > > > semantics of > >> > > > > > > > > > > > > > > > this > >> > > > > > > > > > > > > > > > > method. It doesn't seem like > >> serialization > >> > > > depends > >> > > > > > on > >> > > > > > > > > > knowing > >> > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > cluster, > >> > > > > > > > > > > > > > > > > I think it's incidental that it comes > >> after > >> > the > >> > > > first > >> > > > > > > > > > > "blocking" > >> > > > > > > > > > > > > > > segment > >> > > > > > > > > > > > > > > > in > >> > > > > > > > > > > > > > > > > the method. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Best, > >> > > > > > > > > > > > > > > > > Moses > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne > >> Dolan > >> > < > >> > > > > > > > > > > > > ryannedo...@gmail.com> > >> > > > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. > My > >> > > > thinking > >> > > > > > is > >> > > > > > > > > that a > >> > > > > > > > > > > > > single > >> > > > > > > > > > > > > > > > > > additional work queue, s.t. send() can > >> > > enqueue > >> > > > and > >> > > > > > > > > return, > >> > > > > > > > > > > > seems > >> > > > > > > > > > > > > like > >> > > > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > > lightest touch. However, I don't think > >> we > >> > can > >> > > > > > > trivially > >> > > > > > > > > > > process > >> > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > queue > >> > > > > > > > > > > > > > > > > > in an internal thread pool without > >> subtly > >> > > > changing > >> > > > > > > > > behavior > >> > > > > > > > > > > for > >> > > > > > > > > > > > > some > >> > > > > > > > > > > > > > > > > users. > >> > > > > > > > > > > > > > > > > > For example, users will often run > >> send() in > >> > > > > > multiple > >> > > > > > > > > > threads > >> > > > > > > > > > > in > >> > > > > > > > > > > > > order > >> > > > > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > > > serialize faster, but that wouldn't > work > >> > > quite > >> > > > the > >> > > > > > > same > >> > > > > > > > > if > >> > > > > > > > > > > > there > >> > > > > > > > > > > > > were > >> > > > > > > > > > > > > > > > an > >> > > > > > > > > > > > > > > > > > internal thread pool. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > For this reason I'm thinking we need > to > >> > make > >> > > > sure > >> > > > > > any > >> > > > > > > > > such > >> > > > > > > > > > > > > changes > >> > > > > > > > > > > > > > > are > >> > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with > an > >> > > > additional > >> > > > > > > > > > > > ThreadFactory > >> > > > > > > > > > > > > > > > > parameter. > >> > > > > > > > > > > > > > > > > > That would at least clearly indicate > >> that > >> > > work > >> > > > will > >> > > > > > > > > happen > >> > > > > > > > > > > > > > > off-thread, > >> > > > > > > > > > > > > > > > > and > >> > > > > > > > > > > > > > > > > > would require opt-in for the new > >> behavior. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory > >> could be > >> > > > used to > >> > > > > > > > > create > >> > > > > > > > > > > the > >> > > > > > > > > > > > > worker > >> > > > > > > > > > > > > > > > > > thread that process queued sends, > which > >> > could > >> > > > > > fan-out > >> > > > > > > > to > >> > > > > > > > > > > > > > > per-partition > >> > > > > > > > > > > > > > > > > > threads from there. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > So then you'd have two ways to send: > the > >> > > > existing > >> > > > > > > way, > >> > > > > > > > > > where > >> > > > > > > > > > > > > serde > >> > > > > > > > > > > > > > > and > >> > > > > > > > > > > > > > > > > > interceptors and whatnot are executed > on > >> > the > >> > > > > > calling > >> > > > > > > > > > thread, > >> > > > > > > > > > > > and > >> > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > new > >> > > > > > > > > > > > > > > > > > way, which returns right away and uses > >> an > >> > > > internal > >> > > > > > > > > > Executor. > >> > > > > > > > > > > As > >> > > > > > > > > > > > > you > >> > > > > > > > > > > > > > > > point > >> > > > > > > > > > > > > > > > > > out, the semantics would be identical > in > >> > > either > >> > > > > > case, > >> > > > > > > > and > >> > > > > > > > > > it > >> > > > > > > > > > > > > would be > >> > > > > > > > > > > > > > > > > very > >> > > > > > > > > > > > > > > > > > easy for clients to switch. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Ryanne > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM > Nakamura < > >> > > > > > > > nny...@gmail.com > >> > > > > > > > > > > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Hey Folks, > >> > > > > > > > > > > > > > > > > > > I just posted a new proposal > >> > > > > > > > > > > > > > > > > > > < > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > in the wiki. I think we have an > >> > > opportunity > >> > > > to > >> > > > > > > > improve > >> > > > > > > > > > the > >> > > > > > > > > > > > > > > > > > > KafkaProducer#send user experience. > >> It > >> > > would > >> > > > > > > > certainly > >> > > > > > > > > > > make > >> > > > > > > > > > > > > our > >> > > > > > > > > > > > > > > > lives > >> > > > > > > > > > > > > > > > > > > easier. Please take a look! There > >> are > >> > two > >> > > > > > > > subproblems > >> > > > > > > > > > > that > >> > > > > > > > > > > > I > >> > > > > > > > > > > > > > > could > >> > > > > > > > > > > > > > > > > use > >> > > > > > > > > > > > > > > > > > > guidance on, so I would appreciate > >> > feedback > >> > > > on > >> > > > > > both > >> > > > > > > > of > >> > > > > > > > > > > them. > >> > > > > > > > > > > > > > > > > > > Best, > >> > > > > > > > > > > > > > > > > > > Moses > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > -- > >> > > > > > > > > >> > > > > > > > Matthew de Detrich > >> > > > > > > > > >> > > > > > > > *Aiven Deutschland GmbH* > >> > > > > > > > > >> > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > >> > > > > > > > > >> > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > >> > > > > > > > > >> > > > > > > > *m:* +491603708037 > >> > > > > > > > > >> > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > -- > >> > > > > > > >> > > > > > Matthew de Detrich > >> > > > > > > >> > > > > > *Aiven Deutschland GmbH* > >> > > > > > > >> > > > > > Immanuelkirchstraße 26, 10405 Berlin > >> > > > > > > >> > > > > > Amtsgericht Charlottenburg, HRB 209739 B > >> > > > > > > >> > > > > > *m:* +491603708037 > >> > > > > > > >> > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > -- > >> > > > >> > > Matthew de Detrich > >> > > > >> > > *Aiven Deutschland GmbH* > >> > > > >> > > Immanuelkirchstraße 26, 10405 Berlin > >> > > > >> > > Amtsgericht Charlottenburg, HRB 209739 B > >> > > > >> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > >> > > > >> > > *m:* +491603708037 > >> > > > >> > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > >> > > > >> > > >> > >> > >> -- > >> > >> Matthew de Detrich > >> > >> *Aiven Deutschland GmbH* > >> > >> Immanuelkirchstraße 26, 10405 Berlin > >> > >> Amtsgericht Charlottenburg, HRB 209739 B > >> > >> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > >> > >> *m:* +491603708037 > >> > >> *w:* aiven.io *e:* matthew.dedetr...@aiven.io > >> > > > > > > -- > > > > Josep Prat > > > > *Aiven Deutschland GmbH* > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > > > *m:* +491715557497 > > > > *w:* aiven.io > > > > *e:* josep.p...@aiven.io > > >