Hi Colin, Sorry, I still don't follow.
Right now `KafkaProducer#send` seems to trigger a metadata fetch. Today, we block on that before returning. Is your proposal that we move the metadata fetch out of `KafkaProducer#send` entirely? Even if the metadata fetch moves to be non-blocking, I think we still need to deal with the problems we've discussed before if the fetch happens in the `KafkaProducer#send` method. How do we maintain the ordering semantics of `KafkaProducer#send`? How do we prevent our buffer from filling up? Which thread is responsible for checking poll()? The only approach I can see that would avoid this would be moving the metadata fetch to happen at a different time. But it's not clear to me when would be a more appropriate time to do the metadata fetch than `KafkaProducer#send`. I think there's something I'm missing here. Would you mind helping me figure out what it is? Best, Moses On Sun, May 30, 2021 at 5:35 PM Colin McCabe <cmcc...@apache.org> wrote: > On Tue, May 25, 2021, at 11:26, Nakamura wrote: > > Hey Colin, > > > > For the metadata case, what would fixing the bug look like? I agree that > > we should fix it, but I don't have a clear picture in my mind of what > > fixing it should look like. Can you elaborate? > > > > If the blocking metadata fetch bug were fixed, neither the producer nor > the consumer would block while fetching metadata. A poll() call would > initiate a metadata fetch if needed, and a subsequent call to poll() would > handle the results if needed. Basically the same paradigm we use for other > network communication in the producer and consumer. > > best, > Colin > > > Best, > > Moses > > > > On Mon, May 24, 2021 at 1:54 PM Colin McCabe <cmcc...@apache.org> wrote: > > > > > Hi all, > > > > > > I agree that we should give users the option of having a fully async > API, > > > but I don't think external thread pools or queues are the right > direction > > > to go here. They add performance overheads and don't address the root > > > causes of the problem. > > > > > > There are basically two scenarios where we block, currently. One is > when > > > we are doing a metadata fetch. I think this is clearly a bug, or at > least > > > an implementation limitation. From the user's point of view, the fact > that > > > we are doing a metadata fetch is an implementation detail that really > > > shouldn't be exposed like this. We have talked about fixing this in the > > > past. I think we just should spend the time to do it. > > > > > > The second scenario is where the client has produced too much data in > too > > > little time. This could happen if there is a network glitch, or the > server > > > is slower than expected. In this case, the behavior is intentional and > not > > > a bug. To understand this, think about what would happen if we didn't > > > block. We would start buffering more and more data in memory, until > finally > > > the application died with an out of memory error. That would be > frustrating > > > for users and wouldn't add to the usability of Kafka. > > > > > > We could potentially have an option to handle the out-of-memory > scenario > > > differently by returning an error code immediately rather than > blocking. > > > Applications would have to be rewritten to handle this properly, but > it is > > > a possibility. I suspect that most of them wouldn't use this, but we > could > > > offer it as a possibility for async purists (which might include > certain > > > frameworks). The big problem the users would have to solve is what to > do > > > with the record that they were unable to produce due to the buffer full > > > issue. > > > > > > best, > > > Colin > > > > > > > > > On Thu, May 20, 2021, at 10:35, Nakamura wrote: > > > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > let's > > > fix > > > > > the issue of send being misleadingly asynchronous (i.e. internally > its > > > > > blocking) and then later one we can make the various > > > > > threadpools configurable with a sane default. > > > > > > > > I like that approach. I updated the "Which thread should be > responsible > > > for > > > > waiting" part of KIP-739 to add your suggestion as my recommended > > > approach, > > > > thank you! If no one else has major concerns about that approach, > I'll > > > > move the alternatives to "rejected alternatives". > > > > > > > > On Thu, May 20, 2021 at 7:26 AM Matthew de Detrich > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > @ > > > > > > > > > > Nakamura > > > > > On Wed, May 19, 2021 at 7:35 PM Nakamura <nny...@gmail.com> wrote: > > > > > > > > > > > @Ryanne: > > > > > > In my mind's eye I slightly prefer the throwing the "cannot > enqueue" > > > > > > exception to satisfying the future immediately with the "cannot > > > enqueue" > > > > > > exception? But I agree, it would be worth doing more research. > > > > > > > > > > > > @Matthew: > > > > > > > > > > > > > 3. Using multiple thread pools is definitely recommended for > > > different > > > > > > > types of tasks, for serialization which is CPU bound you > definitely > > > > > would > > > > > > > want to use a bounded thread pool that is fixed by the number > of > > > CPU's > > > > > > (or > > > > > > > something along those lines). > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > is > > > > > a > > > > > > > very good guide on this topic > > > > > > I think this guide is good in general, but I would be hesitant to > > > follow > > > > > > its guidance re: offloading serialization without benchmarking > it. > > > My > > > > > > understanding is that context-switches have gotten much cheaper, > and > > > that > > > > > > gains from cache locality are small, but they're not nothing. > > > Especially > > > > > > if the workload has a very small serialization cost, I wouldn't > be > > > > > shocked > > > > > > if it made it slower. I feel pretty strongly that we should do > more > > > > > > research here before unconditionally encouraging serialization > in a > > > > > > threadpool. If people think it's important to do it here (eg if > we > > > think > > > > > > it would mean another big API change) then we should start > thinking > > > about > > > > > > what benchmarking we can do to gain higher confidence in this > kind of > > > > > > change. However, I don't think it would change semantics as > > > > > substantially > > > > > > as we're proposing here, so I would vote for pushing this to a > > > subsequent > > > > > > KIP. > > > > > > > > > > > Of course, its all down to benchmarking, benchmarking and > benchmarking. > > > > > Ideally speaking you want to use all of the resources available to > > > you, so > > > > > if you have a bottleneck in serialization and you have many cores > free > > > then > > > > > using multiple cores may be more appropriate than a single core. > > > Typically > > > > > I would expect that using a single thread to do serialization is > > > likely to > > > > > be the most situation, I was just responding to an earlier point > that > > > was > > > > > made in regards to using ThreadPools for serialization (note that > you > > > can > > > > > also just use a ThreadPool that is pinned to a single thread) > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Regarding providing the ability for users to supply their > own > > > custom > > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > > Especially > > > > > > > when it gets to monitoring/tracing, giving the ability for > users to > > > > > > provide > > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > > doing so > > > > > > > means a lot of boilerplatery changes to the API. Typically > > > speaking a > > > > > lot > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > ExecutionContext/ThreadPools > > > > > > > (at least on a more rudimentary level) and hence allowing > users to > > > > > supply > > > > > > a > > > > > > > global singleton ThreadPool for IO tasks and another for CPU > tasks > > > > > makes > > > > > > > their lives a lot easier. However due to the large amount of > > > changes to > > > > > > the > > > > > > > API, it may be more appropriate to just use internal thread > pools > > > (for > > > > > > now) > > > > > > > since at least it's not any worse than what exists currently > and > > > this > > > > > can > > > > > > > be an improvement that is done later? > > > > > > Is there an existing threadpool that you suggest we reuse? Or > are > > > you > > > > > > imagining that we make our own internal threadpool, and then > maybe > > > expose > > > > > > configuration flags to manipulate it? For what it's worth, I > like > > > having > > > > > > an internal threadpool (perhaps just FJP.commonpool) and then > > > providing > > > > > an > > > > > > alternative to pass your own threadpool. That way people who > want > > > finer > > > > > > control can get it, and everyone else can do OK with the default. > > > > > > > > > > > Indeed that is what I am saying. The most ideal situation is that > > > there is > > > > > a default internal threadpool that Kafka uses, however users of > Kafka > > > can > > > > > configure there own threadpool. Having a singleton ThreadPool for > > > blocking > > > > > IO, non blocking IO and CPU bound tasks which can be plugged in > all of > > > your > > > > > libraries (including Kafka) makes resource management much easier > to > > > do and > > > > > also gives control of users to override specific threadpools for > > > > > exceptional cases (i.e. providing a Threadpool that is pinned to a > > > single > > > > > core which tends to give the best latency results if this is > something > > > that > > > > > is critical for you). > > > > > > > > > > My suggestion was just do this in multiple steps/phases, firstly > let's > > > fix > > > > > the issue of send being misleadingly asynchronous (i.e. internally > its > > > > > blocking) and then later one we can make the various > > > > > threadpools configurable with a sane default. > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 19, 2021 at 6:01 AM Matthew de Detrich > > > > > > <matthew.dedetr...@aiven.io.invalid> wrote: > > > > > > > > > > > > > Here are my two cents here (note that I am only seeing this on > a > > > > > surface > > > > > > > level) > > > > > > > > > > > > > > 1. If we are going this road it makes sense to do this > "properly" > > > (i.e. > > > > > > > using queues as Ryan suggested). The reason I am saying this > is > > > that > > > > > it > > > > > > > seems that the original goal of the KIP is for it to be used in > > > other > > > > > > > asynchronous systems and from my personal experience, you > really do > > > > > need > > > > > > to > > > > > > > make the implementation properly asynchronous otherwise it's > > > really not > > > > > > > that useful. > > > > > > > 2. Due to the previous point and what was said by others, this > is > > > > > likely > > > > > > > going to break some existing semantics (i.e. people are > currently > > > > > relying > > > > > > > on blocking semantics) so adding another method's/interface > plus > > > > > > > deprecating the older one is more annoying but ideal. > > > > > > > 3. Using multiple thread pools is definitely recommended for > > > different > > > > > > > types of tasks, for serialization which is CPU bound you > definitely > > > > > would > > > > > > > want to use a bounded thread pool that is fixed by the number > of > > > CPU's > > > > > > (or > > > > > > > something along those lines). > > > > > > > > https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c > > > is > > > > > a > > > > > > > very good guide on this topic > > > > > > > 4. Regarding providing the ability for users to supply their > own > > > custom > > > > > > > ThreadPool this is more of an ergonomics question for the API. > > > > > Especially > > > > > > > when it gets to monitoring/tracing, giving the ability for > users to > > > > > > provide > > > > > > > their own custom IO/CPU ThreadPools is ideal however as stated > > > doing so > > > > > > > means a lot of boilerplatery changes to the API. Typically > > > speaking a > > > > > lot > > > > > > > of monitoring/tracing/diagnosing is done on > > > > > ExecutionContext/ThreadPools > > > > > > > (at least on a more rudimentary level) and hence allowing > users to > > > > > > supply a > > > > > > > global singleton ThreadPool for IO tasks and another for CPU > tasks > > > > > makes > > > > > > > their lives a lot easier. However due to the large amount of > > > changes to > > > > > > the > > > > > > > API, it may be more appropriate to just use internal thread > pools > > > (for > > > > > > now) > > > > > > > since at least it's not any worse than what exists currently > and > > > this > > > > > can > > > > > > > be an improvement that is done later? > > > > > > > > > > > > > > On Wed, May 19, 2021 at 2:56 AM Ryanne Dolan < > > > ryannedo...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > I was thinking the sender would typically wrap send() in a > > > > > > backoff/retry > > > > > > > > loop, or else ignore any failures and drop sends on the floor > > > > > > > > (fire-and-forget), and in both cases I think failing > immediately > > > is > > > > > > > better > > > > > > > > than blocking for a new spot in the queue or asynchronously > > > failing > > > > > > > > somehow. > > > > > > > > > > > > > > > > I think a failed future is adequate for the "explicit > > > backpressure > > > > > > > signal" > > > > > > > > while avoiding any blocking anywhere. I think if we try to > > > > > > asynchronously > > > > > > > > signal the caller of failure (either by asynchronously > failing > > > the > > > > > > future > > > > > > > > or invoking a callback off-thread or something) we'd force > the > > > caller > > > > > > to > > > > > > > > either block or poll waiting for that signal, which somewhat > > > defeats > > > > > > the > > > > > > > > purpose we're after. And of course blocking for a spot in the > > > queue > > > > > > > > definitely defeats the purpose (tho perhaps ameliorates the > > > problem > > > > > > > some). > > > > > > > > > > > > > > > > Throwing an exception to the caller directly (not via the > > > future) is > > > > > > > > another option with precedent in Kafka clients, tho it > doesn't > > > seem > > > > > as > > > > > > > > ergonomic to me. > > > > > > > > > > > > > > > > It would be interesting to analyze some existing usage and > > > determine > > > > > > how > > > > > > > > difficult it would be to convert it to the various proposed > APIs. > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 3:27 PM Nakamura <nny...@gmail.com> > wrote: > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > Hmm, that's an interesting idea. Basically it would mean > that > > > > > after > > > > > > > > > calling send, you would also have to check whether the > returned > > > > > > future > > > > > > > > had > > > > > > > > > failed with a specific exception. I would be open to it, > > > although > > > > > I > > > > > > > > think > > > > > > > > > it might be slightly more surprising, since right now the > > > paradigm > > > > > is > > > > > > > > > "enqueue synchronously, the future represents whether we > > > succeeded > > > > > in > > > > > > > > > sending or not" and the new one would be "enqueue > > > synchronously, > > > > > the > > > > > > > > future > > > > > > > > > either represents whether we succeeded in enqueueing or > not (in > > > > > which > > > > > > > > case > > > > > > > > > it will be failed immediately if it failed to enqueue) or > > > whether > > > > > we > > > > > > > > > succeeded in sending or not". > > > > > > > > > > > > > > > > > > But you're right, it should be on the table, thank you for > > > > > suggesting > > > > > > > it! > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021 at 12:23 PM Ryanne Dolan < > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Moses, in the case of a full queue, could we just return > a > > > failed > > > > > > > > future > > > > > > > > > > immediately? > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > On Tue, May 18, 2021, 10:39 AM Nakamura < > nny...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Alexandre, > > > > > > > > > > > > > > > > > > > > > > Thanks for bringing this up, I think I could use some > > > feedback > > > > > in > > > > > > > > this > > > > > > > > > > > area. There are two mechanisms here, one for slowing > down > > > when > > > > > > we > > > > > > > > > don't > > > > > > > > > > > have the relevant metadata, and the other for slowing > down > > > > > when a > > > > > > > > queue > > > > > > > > > > has > > > > > > > > > > > filled up. Although the first one applies backpressure > > > > > somewhat > > > > > > > > > > > inadvertently, we could still get in trouble if we're > not > > > > > > providing > > > > > > > > > > > information to the mechanism that monitors whether > we're > > > > > queueing > > > > > > > too > > > > > > > > > > > much. As for the second one, that is a classic > > > backpressure > > > > > use > > > > > > > > case, > > > > > > > > > so > > > > > > > > > > > it's definitely important that we don't drop that > ability. > > > > > > > > > > > > > > > > > > > > > > Right now backpressure is applied by blocking, which > is a > > > > > natural > > > > > > > way > > > > > > > > > to > > > > > > > > > > > apply backpressure in synchronous systems, but can > lead to > > > > > > > > unnecessary > > > > > > > > > > > slowdowns in asynchronous systems. In my opinion, the > > > safest > > > > > way > > > > > > > to > > > > > > > > > > apply > > > > > > > > > > > backpressure in an asynchronous model is to have an > > > explicit > > > > > > > > > backpressure > > > > > > > > > > > signal. A good example would be returning an > exception, > > > and > > > > > > > > providing > > > > > > > > > an > > > > > > > > > > > optional hook to add a callback onto so that you can be > > > > > notified > > > > > > > when > > > > > > > > > > it's > > > > > > > > > > > ready to accept more messages. > > > > > > > > > > > > > > > > > > > > > > However, this would be a really big change to how > users use > > > > > > > > > > > KafkaProducer#send, so I don't know how much appetite > we > > > have > > > > > for > > > > > > > > > making > > > > > > > > > > > that kind of change. Maybe it would be simpler to > remove > > > the > > > > > > > "don't > > > > > > > > > > block > > > > > > > > > > > when the per-topic queue is full" from the scope of > this > > > KIP, > > > > > and > > > > > > > > only > > > > > > > > > > > focus on when metadata is available? The downside is > that > > > we > > > > > > will > > > > > > > > > > probably > > > > > > > > > > > want to change the API again later to fix this, so it > > > might be > > > > > > > better > > > > > > > > > to > > > > > > > > > > > just rip the bandaid off now. > > > > > > > > > > > > > > > > > > > > > > One slightly nasty thing here is that because queueing > > > order is > > > > > > > > > > important, > > > > > > > > > > > if we want to use exceptions, we will want to be able > to > > > signal > > > > > > the > > > > > > > > > > failure > > > > > > > > > > > to enqueue to the caller in such a way that they can > still > > > > > > enforce > > > > > > > > > > message > > > > > > > > > > > order if they want. So we can't embed the failure > > > directly in > > > > > > the > > > > > > > > > > returned > > > > > > > > > > > future, we should either return two futures (nested, > or as > > > a > > > > > > tuple) > > > > > > > > or > > > > > > > > > > else > > > > > > > > > > > throw an exception to explain a backpressure. > > > > > > > > > > > > > > > > > > > > > > So there are a few things we should work out here: > > > > > > > > > > > > > > > > > > > > > > 1. Should we keep the "too many bytes enqueued" part of > > > this in > > > > > > > > scope? > > > > > > > > > > (I > > > > > > > > > > > would say yes, so that we can minimize churn in this > API) > > > > > > > > > > > 2. How should we signal backpressure so that it's > > > appropriate > > > > > for > > > > > > > > > > > asynchronous systems? (I would say that we should > throw an > > > > > > > > exception. > > > > > > > > > > If > > > > > > > > > > > we choose this and we want to pursue the queueing > path, we > > > > > would > > > > > > > > *not* > > > > > > > > > > want > > > > > > > > > > > to enqueue messages that would push us over the limit, > and > > > > > would > > > > > > > only > > > > > > > > > > want > > > > > > > > > > > to enqueue messages when we're waiting for metadata, > and we > > > > > would > > > > > > > > want > > > > > > > > > to > > > > > > > > > > > keep track of the total number of bytes for those > > > messages). > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > On Sun, May 16, 2021 at 6:16 AM Alexandre Dupriez < > > > > > > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > Hello Nakamura, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for proposing this change. I can see how the > > > blocking > > > > > > > > > behaviour > > > > > > > > > > > > can be a problem when integrating with reactive > > > frameworks > > > > > such > > > > > > > as > > > > > > > > > > > > Akka. One of the questions I would have is how you > would > > > > > handle > > > > > > > > back > > > > > > > > > > > > pressure and avoid memory exhaustion when the > producer's > > > > > buffer > > > > > > > is > > > > > > > > > > > > full and tasks would start to accumulate in the > > > out-of-band > > > > > > queue > > > > > > > > or > > > > > > > > > > > > thread pool introduced with this KIP. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > > > Le ven. 14 mai 2021 à 15:55, Ryanne Dolan < > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > a > > > > > > > > > > > écrit > > > > > > > > > > > > : > > > > > > > > > > > > > > > > > > > > > > > > > > Makes sense! > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 14, 2021, 9:39 AM Nakamura < > > > nny...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I see what you're saying about serde blocking, > but I > > > > > think > > > > > > we > > > > > > > > > > should > > > > > > > > > > > > > > consider it out of scope for this patch. Right > now > > > we've > > > > > > > > nailed > > > > > > > > > > > down a > > > > > > > > > > > > > > couple of use cases where we can unambiguously > say, > > > "I > > > > > can > > > > > > > make > > > > > > > > > > > > progress > > > > > > > > > > > > > > now" or "I cannot make progress now", which > makes it > > > > > > possible > > > > > > > > to > > > > > > > > > > > > offload to > > > > > > > > > > > > > > a different thread only if we are unable to make > > > > > progress. > > > > > > > > > > Extending > > > > > > > > > > > > this > > > > > > > > > > > > > > to CPU work like serde would mean always > offloading, > > > > > which > > > > > > > > would > > > > > > > > > > be a > > > > > > > > > > > > > > really big performance change. It might be worth > > > > > exploring > > > > > > > > > anyway, > > > > > > > > > > > > but I'd > > > > > > > > > > > > > > rather keep this patch focused on improving > > > ergonomics, > > > > > > > rather > > > > > > > > > than > > > > > > > > > > > > > > muddying the waters with evaluating performance > very > > > > > > deeply. > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think if we really do want to support serde or > > > > > > interceptors > > > > > > > > > that > > > > > > > > > > do > > > > > > > > > > > > IO on > > > > > > > > > > > > > > the send path (which seems like an anti-pattern > to > > > me), > > > > > we > > > > > > > > should > > > > > > > > > > > > consider > > > > > > > > > > > > > > making that a separate SIP, and probably also > > > consider > > > > > > > changing > > > > > > > > > the > > > > > > > > > > > > API to > > > > > > > > > > > > > > use Futures (or CompletionStages). But I would > > > rather > > > > > > avoid > > > > > > > > > scope > > > > > > > > > > > > creep, > > > > > > > > > > > > > > so that we have a better chance of fixing this > part > > > of > > > > > the > > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I think some exceptions will move to being > async > > > > > > instead > > > > > > > > of > > > > > > > > > > > sync. > > > > > > > > > > > > > > They'll still be surfaced in the Future, so I'm > not > > > so > > > > > > > > confident > > > > > > > > > > that > > > > > > > > > > > > it > > > > > > > > > > > > > > would be that big a shock to users though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 7:44 PM Ryanne Dolan < > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > re serialization, my concern is that > serialization > > > > > often > > > > > > > > > accounts > > > > > > > > > > > > for a > > > > > > > > > > > > > > lot > > > > > > > > > > > > > > > of the cycles spent before returning the > future. > > > It's > > > > > not > > > > > > > > > > blocking > > > > > > > > > > > > per > > > > > > > > > > > > > > se, > > > > > > > > > > > > > > > but it's the same effect from the caller's > > > perspective. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Moreover, serde impls often block themselves, > e.g. > > > when > > > > > > > > > fetching > > > > > > > > > > > > schemas > > > > > > > > > > > > > > > from a registry. I suppose it's also possible > to > > > block > > > > > in > > > > > > > > > > > > Interceptors > > > > > > > > > > > > > > > (e.g. writing audit events or metrics), which > > > happens > > > > > > > before > > > > > > > > > > serdes > > > > > > > > > > > > iiuc. > > > > > > > > > > > > > > > So any blocking in either of those plugins > would > > > block > > > > > > the > > > > > > > > send > > > > > > > > > > > > unless we > > > > > > > > > > > > > > > queue first. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So I think we want to queue first and do > everything > > > > > > > > off-thread > > > > > > > > > > when > > > > > > > > > > > > using > > > > > > > > > > > > > > > the new API, whatever that looks like. I just > want > > > to > > > > > > make > > > > > > > > sure > > > > > > > > > > we > > > > > > > > > > > > don't > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > that for clients that wouldn't expect it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another consideration is exception handling. > If we > > > > > queue > > > > > > > > right > > > > > > > > > > > away, > > > > > > > > > > > > > > we'll > > > > > > > > > > > > > > > defer some exceptions that currently are > thrown to > > > the > > > > > > > caller > > > > > > > > > > > > (before the > > > > > > > > > > > > > > > future is returned). In the new API, the send() > > > > > wouldn't > > > > > > > > throw > > > > > > > > > > any > > > > > > > > > > > > > > > exceptions, and instead the future would fail. > I > > > think > > > > > > that > > > > > > > > > might > > > > > > > > > > > > mean > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > a new method signature is required. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 2:57 PM Nakamura < > > > > > > > > > nakamura.mo...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Ryanne, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree we should add an additional > constructor > > > (or > > > > > > else > > > > > > > an > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > overload in KafkaProducer#send, but the new > > > > > constructor > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > easier > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > understand) if we're targeting the "user > > > provides the > > > > > > > > thread" > > > > > > > > > > > > approach. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From looking at the code, I think we can keep > > > record > > > > > > > > > > > serialization > > > > > > > > > > > > on > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > user thread, if we consider that an important > > > part of > > > > > > the > > > > > > > > > > > > semantics of > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > method. It doesn't seem like serialization > > > depends > > > > > on > > > > > > > > > knowing > > > > > > > > > > > the > > > > > > > > > > > > > > > cluster, > > > > > > > > > > > > > > > > I think it's incidental that it comes after > the > > > first > > > > > > > > > > "blocking" > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > the method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021 at 2:38 PM Ryanne Dolan > < > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Moses, I like the direction here. My > > > thinking > > > > > is > > > > > > > > that a > > > > > > > > > > > > single > > > > > > > > > > > > > > > > > additional work queue, s.t. send() can > enqueue > > > and > > > > > > > > return, > > > > > > > > > > > seems > > > > > > > > > > > > like > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > lightest touch. However, I don't think we > can > > > > > > trivially > > > > > > > > > > process > > > > > > > > > > > > that > > > > > > > > > > > > > > > > queue > > > > > > > > > > > > > > > > > in an internal thread pool without subtly > > > changing > > > > > > > > behavior > > > > > > > > > > for > > > > > > > > > > > > some > > > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > > > > For example, users will often run send() in > > > > > multiple > > > > > > > > > threads > > > > > > > > > > in > > > > > > > > > > > > order > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > serialize faster, but that wouldn't work > quite > > > the > > > > > > same > > > > > > > > if > > > > > > > > > > > there > > > > > > > > > > > > were > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > internal thread pool. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For this reason I'm thinking we need to > make > > > sure > > > > > any > > > > > > > > such > > > > > > > > > > > > changes > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > opt-in. Maybe a new constructor with an > > > additional > > > > > > > > > > > ThreadFactory > > > > > > > > > > > > > > > > parameter. > > > > > > > > > > > > > > > > > That would at least clearly indicate that > work > > > will > > > > > > > > happen > > > > > > > > > > > > > > off-thread, > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > would require opt-in for the new behavior. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Under the hood, this ThreadFactory could be > > > used to > > > > > > > > create > > > > > > > > > > the > > > > > > > > > > > > worker > > > > > > > > > > > > > > > > > thread that process queued sends, which > could > > > > > fan-out > > > > > > > to > > > > > > > > > > > > > > per-partition > > > > > > > > > > > > > > > > > threads from there. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So then you'd have two ways to send: the > > > existing > > > > > > way, > > > > > > > > > where > > > > > > > > > > > > serde > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > interceptors and whatnot are executed on > the > > > > > calling > > > > > > > > > thread, > > > > > > > > > > > and > > > > > > > > > > > > the > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > way, which returns right away and uses an > > > internal > > > > > > > > > Executor. > > > > > > > > > > As > > > > > > > > > > > > you > > > > > > > > > > > > > > > point > > > > > > > > > > > > > > > > > out, the semantics would be identical in > either > > > > > case, > > > > > > > and > > > > > > > > > it > > > > > > > > > > > > would be > > > > > > > > > > > > > > > > very > > > > > > > > > > > > > > > > > easy for clients to switch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 13, 2021, 9:00 AM Nakamura < > > > > > > > nny...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Folks, > > > > > > > > > > > > > > > > > > I just posted a new proposal > > > > > > > > > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > in the wiki. I think we have an > opportunity > > > to > > > > > > > improve > > > > > > > > > the > > > > > > > > > > > > > > > > > > KafkaProducer#send user experience. It > would > > > > > > > certainly > > > > > > > > > > make > > > > > > > > > > > > our > > > > > > > > > > > > > > > lives > > > > > > > > > > > > > > > > > > easier. Please take a look! There are > two > > > > > > > subproblems > > > > > > > > > > that > > > > > > > > > > > I > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > > > guidance on, so I would appreciate > feedback > > > on > > > > > both > > > > > > > of > > > > > > > > > > them. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Moses > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Matthew de Detrich > > > > > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Matthew de Detrich > > > > > > > > > > *Aiven Deutschland GmbH* > > > > > > > > > > Immanuelkirchstraße 26, 10405 Berlin > > > > > > > > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > > > > > > *m:* +491603708037 > > > > > > > > > > *w:* aiven.io *e:* matthew.dedetr...@aiven.io > > > > > > > > > > > > > > >