Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945.
Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote: > Hey Erik, > > Sorry for holding up this email for a few days since Colin's response > includes some of my concerns. I'm in favor of this KIP, and I think your > approach seems safe. Of course, I probably missed something therefore I > think this KIP needs to cover different use cases to demonstrate it doesn't > cause any unsafe access. I think this can be demonstrated via diagrams and > some code in the KIP. > > Thanks, > P > > On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten > <e.vanoos...@grons.nl.invalid> wrote: > >> Hello Colin, >> >> >> In KIP-944, the callback thread can only delegate to another thread >> after reading from and writing to a threadlocal variable, providing the >> barriers right there. >> >> > I don't see any documentation that accessing thread local variables >> provides a total store or load barrier. Do you have such documentation? >> It seems like if this were the case, we could eliminate volatile >> variables from most of the code base. >> >> Now I was imprecise. The thread-locals are only somewhat involved. In >> the KIP proposal the callback thread reads an access key from a >> thread-local variable. It then needs to pass that access key to another >> thread, which then can set it on its own thread-local variable. The act >> of passing a value from one thread to another implies that a memory >> barrier needs to be passed. However, this is all not so relevant since >> there is no need to pass the access key back when the other thread is >> done. >> >> But now I think about it a bit more, the locking mechanism runs in a >> synchronized block. If I remember correctly this should be enough to >> pass read and write barriers. >> >> >> In the current implementation the consumer is also invoked from >> random threads. If it works now, it should continue to work. >> > I'm not sure what you're referring to. Can you expand on this? >> >> Any invocation of the consumer (e.g. method poll) is not from a thread >> managed by the consumer. This is what I was assuming you meant with the >> term 'random thread'. >> >> > Hmm, not sure what you mean by "cooperate with blocking code." If you >> have 10 green threads you're multiplexing on to one CPU thread, and that >> CPU thread gets blocked because of what one green thread is doing, the >> other 9 green threads are blocked too, right? I guess it's "just" a >> performance problem, but it still seems like it could be a serious one. >> >> There are several ways to deal with this. All async runtimes I know >> (Akka, Zio, Cats-effects) support this by letting you mark a task as >> blocking. The runtime will then either schedule it to another >> thread-pool, or it will grow the thread-pool to accommodate. In any case >> 'the other 9 green threads' will simply be scheduled to another real >> thread. In addition, some of these runtimes detect long running tasks >> and will reschedule waiting tasks to another thread. This is all a bit >> off topic though. >> >> > I don't see why this has to be "inherently multi-threaded." Why can't >> we have the other threads report back what messages they've processed to >> the worker thread. Then it will be able to handle these callbacks >> without involving the other threads. >> >> Please consider the context which is that we are running inside the >> callback of the rebalance listener. The only way to execute something >> and also have a timeout on it is to run the something on another thread. >> >> Kind regards, >> Erik. >> >> >> Op 08-07-2023 om 19:17 schreef Colin McCabe: >> > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: >> >> Hi Colin, >> >> >> >> Thanks for your thoughts and taking the time to reply. >> >> >> >> Let me take away your concerns. None of your worries are an issue with >> >> the algorithm described in KIP-944. Here it goes: >> >> >> >> > It's not clear ot me that it's safe to access the Kafka consumer or >> >>> producer concurrently from different threads. >> >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes >> >> through great lengths to make sure that this cannot happen. >> >> >> >> *The only design goal is to allow callbacks to call the consumer from >> >> another thread.* >> >> >> >> To make sure there are no more misunderstandings about this, I have >> >> added this goal to the KIP. >> >> >> > Hi Erik, >> > >> > Sorry, I spoke imprecisely. My concern is not concurrent access, but >> multithreaded access in general. Basically cache line visibility issues. >> > >> >> > This is true even if the accesses happen at different times, >> because >> >>> modern CPUs require memory barriers to guarantee inter-thread >> visibilty >> >>> of loads and stores. >> >> In KIP-944, the callback thread can only delegate to another thread >> >> after reading from and writing to a threadlocal variable, providing the >> >> barriers right there. >> >> >> > I don't see any documentation that accessing thread local variables >> provides a total store or load barrier. Do you have such documentation? It >> seems like if this were the case, we could eliminate volatile variables >> from most of the code base. >> > >> >> > I know that there are at least a few locks in the consumer code >> now, >> >>> due to our need to send heartbeats from a worker thread. I don't think >> >>> those would be sufficient to protect a client that is making calls >> from >> >>> random threads. >> >> In the current implementation the consumer is also invoked from random >> >> threads. If it works now, it should continue to work. >> >> >> > I'm not sure what you're referring to. Can you expand on this? >> > >> >> > There has been some discussion of moving to a more traditional >> model >> >>> where people make calls to the client and the clients passes the given >> >>> data to a single background worker thread. This would avoid a lot lof >> >>> the footguns of the current model and probably better reflect how >> people >> >>> actually use the client. >> >> That is awesome. However, I'd rather not wait for that. >> >> >> >> > Another issue is that neither the producer nor the consumer is >> fully >> >>> nonblocking. There are some corner cases where we do in fact block. >> From >> >>> memory, the producer blocks in some "buffer full" cases, and the >> >>> consumer blocks sometimes when fetching metadata. >> >> I am aware of that. This is not an issue; all async runtimes can >> >> cooperate with blocking code. >> >> >> > Hmm, not sure what you mean by "cooperate with blocking code." If you >> have 10 green threads you're multiplexing on to one CPU thread, and that >> CPU thread gets blocked because of what one green thread is doing, the >> other 9 green threads are blocked too, right? I guess it's "just" a >> performance problem, but it still seems like it could be a serious one. >> > >> >> > I suspect it would be more appropriate for Kotlin coroutines, Zio >> >>> coroutines and so on to adopt this "pass messages to and from a >> >>> background worker thread" model than to try to re-engineer the Kafka >> >>> client ot work from random threads. >> >> In both zio-kafka and fs2-kafka this is already the approach we are >> taking. >> >> >> >> Unfortunately, the Kafka consumer forces us to perform some work in >> >> callbacks: >> >> >> >> * commit completed callback: register that the callback is complete, >> >> * partition revoked callback: in this callback we need to submit >> >> commits from everything consumed and processed so far, using >> >> timeouts if processing takes to long. In an async runtime, this is >> >> an inherently multi-threaded process. Especially, we cannot do >> >> timeouts without involving multiple threads. >> >> >> > I don't see why this has to be "inherently multi-threaded." Why can't >> we have the other threads report back what messages they've processed to >> the worker thread. Then it will be able to handle these callbacks without >> involving the other threads. >> > >> > regards, >> > Colin >> > >> >> I have extended the KIP's motivation to explain the major use case. >> >> >> >> Please read KIP-944 again. Even though the description is extensive >> >> (this callback from callback stuff is tricky), you will find that my >> >> goals are modest. >> >> >> >> Also the implementation is just a few lines. With understanding of the >> >> idea it should not be a lot of work to follow it. >> >> >> >> Kind regards, >> >> Erik. >> >> >> >> >> >> Op 07-07-2023 om 19:57 schreef Colin McCabe: >> >>> Hi Erik, >> >>> >> >>> It's not clear ot me that it's safe to access the Kafka consumer or >> producer concurrently from different threads. There are data structures >> that aren't protected by locks, so I wouldn't necessarily expect accessing >> and mutating them in a concurrent way to work. This is true even if the >> accesses happen at different times, because modern CPUs require memory >> barriers to guarantee inter-thread visibilty of loads and stores. >> >>> >> >>> I am writing this is without doing a detailed dive into the code (I >> haven't been into the consumer / producer code in a bit.) Someone who has >> worked more on the consumer recently might be able to give specific >> examples of things that wouldn't work. >> >>> >> >>> I know that there are at least a few locks in the consumer code now, >> due to our need to send heartbeats from a worker thread. I don't think >> those would be sufficient to protect a client that is making calls from >> random threads. >> >>> >> >>> There has been some discussion of moving to a more traditional model >> where people make calls to the client and the clients passes the given data >> to a single background worker thread. This would avoid a lot lof the >> footguns of the current model and probably better reflect how people >> actually use the client. >> >>> >> >>> Another issue is that neither the producer nor the consumer is fully >> nonblocking. There are some corner cases where we do in fact block. From >> memory, the producer blocks in some "buffer full" cases, and the consumer >> blocks sometimes when fetching metadata. >> >>> >> >>> I suspect it would be more appropriate for Kotlin coroutines, Zio >> coroutines and so on to adopt this "pass messages to and from a background >> worker thread" model than to try to re-engineer the Kafka client ot work >> from random threads. >> >>> >> >>> There is actually somed good advice about how to handle multiple >> threads in the KafkaConsumer.java header file itself. Check the sections >> "One Consumer Per Thread" and "Decouple Consumption and Processing." What >> I'm recommending here is essentially the latter. >> >>> >> >>> I do understand that it's frustrating to not get a quick response. >> However, overall I think this one needs a lot more discussion before >> getting anywhere near a vote. I will leave a -1 just as a procedural step. >> Maybe some of the people working in the client area can also chime in. >> >>> >> >>> best, >> >>> Colin >> >>> >> >>> >> >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: >> >>>> Dear PMCs, >> >>>> >> >>>> So far there have been 0 responses to KIP-944. I understand this may >> not >> >>>> be something that keeps you busy, but this KIP is important to people >> >>>> that use async runtimes like Zio, Cats and Kotlin. >> >>>> >> >>>> Is there anything you need to come to a decision? >> >>>> >> >>>> Kind regards, >> >>>> Erik. >> >>>> >> >>>> >> >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten: >> >>>>> Hello all, >> >>>>> >> >>>>> I'd like to call a vote on KIP-944 Support async runtimes in >> consumer. >> >>>>> It has has been 'under discussion' for 7 days now. 'Under >> discussion' >> >>>>> between quotes, because there were 0 comments so far. I hope the KIP >> >>>>> is clear! >> >>>>> >> >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw >> >>>>> >> >>>>> Kind regards, >> >>>>> Erik. >> >>>>> >> >>>>> >> >> -- >> >> Erik van Oosten >> >> e.vanoos...@grons.nl >> >> https://day-to-day-stuff.blogspot.com >> >> -- >> Erik van Oosten >> e.vanoos...@grons.nl >> https://day-to-day-stuff.blogspot.com >> >>