HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads.
It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: > Hey Erik - Another thing I want to add to my comment is. We are in-process > of re-writing the KafkaConsumer, and I think your proposal would work in > the new consumer because we are going to separate the user thread and the > background thread. Here is the 1-pager, and we are in process of > converting this in to KIP-945. > > Thanks, > P > > On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote: > >> Hey Erik, >> >> Sorry for holding up this email for a few days since Colin's response >> includes some of my concerns. I'm in favor of this KIP, and I think your >> approach seems safe. Of course, I probably missed something therefore I >> think this KIP needs to cover different use cases to demonstrate it doesn't >> cause any unsafe access. I think this can be demonstrated via diagrams and >> some code in the KIP. >> >> Thanks, >> P >> >> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten >> <e.vanoos...@grons.nl.invalid> wrote: >> >>> Hello Colin, >>> >>> >> In KIP-944, the callback thread can only delegate to another thread >>> after reading from and writing to a threadlocal variable, providing the >>> barriers right there. >>> >>> > I don't see any documentation that accessing thread local variables >>> provides a total store or load barrier. Do you have such documentation? >>> It seems like if this were the case, we could eliminate volatile >>> variables from most of the code base. >>> >>> Now I was imprecise. The thread-locals are only somewhat involved. In >>> the KIP proposal the callback thread reads an access key from a >>> thread-local variable. It then needs to pass that access key to another >>> thread, which then can set it on its own thread-local variable. The act >>> of passing a value from one thread to another implies that a memory >>> barrier needs to be passed. However, this is all not so relevant since >>> there is no need to pass the access key back when the other thread is >>> done. >>> >>> But now I think about it a bit more, the locking mechanism runs in a >>> synchronized block. If I remember correctly this should be enough to >>> pass read and write barriers. >>> >>> >> In the current implementation the consumer is also invoked from >>> random threads. If it works now, it should continue to work. >>> > I'm not sure what you're referring to. Can you expand on this? >>> >>> Any invocation of the consumer (e.g. method poll) is not from a thread >>> managed by the consumer. This is what I was assuming you meant with the >>> term 'random thread'. >>> >>> > Hmm, not sure what you mean by "cooperate with blocking code." If you >>> have 10 green threads you're multiplexing on to one CPU thread, and that >>> CPU thread gets blocked because of what one green thread is doing, the >>> other 9 green threads are blocked too, right? I guess it's "just" a >>> performance problem, but it still seems like it could be a serious one. >>> >>> There are several ways to deal with this. All async runtimes I know >>> (Akka, Zio, Cats-effects) support this by letting you mark a task as >>> blocking. The runtime will then either schedule it to another >>> thread-pool, or it will grow the thread-pool to accommodate. In any case >>> 'the other 9 green threads' will simply be scheduled to another real >>> thread. In addition, some of these runtimes detect long running tasks >>> and will reschedule waiting tasks to another thread. This is all a bit >>> off topic though. >>> >>> > I don't see why this has to be "inherently multi-threaded." Why can't >>> we have the other threads report back what messages they've processed to >>> the worker thread. Then it will be able to handle these callbacks >>> without involving the other threads. >>> >>> Please consider the context which is that we are running inside the >>> callback of the rebalance listener. The only way to execute something >>> and also have a timeout on it is to run the something on another thread. >>> >>> Kind regards, >>> Erik. >>> >>> >>> Op 08-07-2023 om 19:17 schreef Colin McCabe: >>> > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: >>> >> Hi Colin, >>> >> >>> >> Thanks for your thoughts and taking the time to reply. >>> >> >>> >> Let me take away your concerns. None of your worries are an issue with >>> >> the algorithm described in KIP-944. Here it goes: >>> >> >>> >> > It's not clear ot me that it's safe to access the Kafka consumer or >>> >>> producer concurrently from different threads. >>> >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes >>> >> through great lengths to make sure that this cannot happen. >>> >> >>> >> *The only design goal is to allow callbacks to call the consumer from >>> >> another thread.* >>> >> >>> >> To make sure there are no more misunderstandings about this, I have >>> >> added this goal to the KIP. >>> >> >>> > Hi Erik, >>> > >>> > Sorry, I spoke imprecisely. My concern is not concurrent access, but >>> multithreaded access in general. Basically cache line visibility issues. >>> > >>> >> > This is true even if the accesses happen at different times, >>> because >>> >>> modern CPUs require memory barriers to guarantee inter-thread >>> visibilty >>> >>> of loads and stores. >>> >> In KIP-944, the callback thread can only delegate to another thread >>> >> after reading from and writing to a threadlocal variable, providing the >>> >> barriers right there. >>> >> >>> > I don't see any documentation that accessing thread local variables >>> provides a total store or load barrier. Do you have such documentation? It >>> seems like if this were the case, we could eliminate volatile variables >>> from most of the code base. >>> > >>> >> > I know that there are at least a few locks in the consumer code >>> now, >>> >>> due to our need to send heartbeats from a worker thread. I don't think >>> >>> those would be sufficient to protect a client that is making calls >>> from >>> >>> random threads. >>> >> In the current implementation the consumer is also invoked from random >>> >> threads. If it works now, it should continue to work. >>> >> >>> > I'm not sure what you're referring to. Can you expand on this? >>> > >>> >> > There has been some discussion of moving to a more traditional >>> model >>> >>> where people make calls to the client and the clients passes the given >>> >>> data to a single background worker thread. This would avoid a lot lof >>> >>> the footguns of the current model and probably better reflect how >>> people >>> >>> actually use the client. >>> >> That is awesome. However, I'd rather not wait for that. >>> >> >>> >> > Another issue is that neither the producer nor the consumer is >>> fully >>> >>> nonblocking. There are some corner cases where we do in fact block. >>> From >>> >>> memory, the producer blocks in some "buffer full" cases, and the >>> >>> consumer blocks sometimes when fetching metadata. >>> >> I am aware of that. This is not an issue; all async runtimes can >>> >> cooperate with blocking code. >>> >> >>> > Hmm, not sure what you mean by "cooperate with blocking code." If you >>> have 10 green threads you're multiplexing on to one CPU thread, and that >>> CPU thread gets blocked because of what one green thread is doing, the >>> other 9 green threads are blocked too, right? I guess it's "just" a >>> performance problem, but it still seems like it could be a serious one. >>> > >>> >> > I suspect it would be more appropriate for Kotlin coroutines, Zio >>> >>> coroutines and so on to adopt this "pass messages to and from a >>> >>> background worker thread" model than to try to re-engineer the Kafka >>> >>> client ot work from random threads. >>> >> In both zio-kafka and fs2-kafka this is already the approach we are >>> taking. >>> >> >>> >> Unfortunately, the Kafka consumer forces us to perform some work in >>> >> callbacks: >>> >> >>> >> * commit completed callback: register that the callback is complete, >>> >> * partition revoked callback: in this callback we need to submit >>> >> commits from everything consumed and processed so far, using >>> >> timeouts if processing takes to long. In an async runtime, this is >>> >> an inherently multi-threaded process. Especially, we cannot do >>> >> timeouts without involving multiple threads. >>> >> >>> > I don't see why this has to be "inherently multi-threaded." Why can't >>> we have the other threads report back what messages they've processed to >>> the worker thread. Then it will be able to handle these callbacks without >>> involving the other threads. >>> > >>> > regards, >>> > Colin >>> > >>> >> I have extended the KIP's motivation to explain the major use case. >>> >> >>> >> Please read KIP-944 again. Even though the description is extensive >>> >> (this callback from callback stuff is tricky), you will find that my >>> >> goals are modest. >>> >> >>> >> Also the implementation is just a few lines. With understanding of the >>> >> idea it should not be a lot of work to follow it. >>> >> >>> >> Kind regards, >>> >> Erik. >>> >> >>> >> >>> >> Op 07-07-2023 om 19:57 schreef Colin McCabe: >>> >>> Hi Erik, >>> >>> >>> >>> It's not clear ot me that it's safe to access the Kafka consumer or >>> producer concurrently from different threads. There are data structures >>> that aren't protected by locks, so I wouldn't necessarily expect accessing >>> and mutating them in a concurrent way to work. This is true even if the >>> accesses happen at different times, because modern CPUs require memory >>> barriers to guarantee inter-thread visibilty of loads and stores. >>> >>> >>> >>> I am writing this is without doing a detailed dive into the code (I >>> haven't been into the consumer / producer code in a bit.) Someone who has >>> worked more on the consumer recently might be able to give specific >>> examples of things that wouldn't work. >>> >>> >>> >>> I know that there are at least a few locks in the consumer code now, >>> due to our need to send heartbeats from a worker thread. I don't think >>> those would be sufficient to protect a client that is making calls from >>> random threads. >>> >>> >>> >>> There has been some discussion of moving to a more traditional model >>> where people make calls to the client and the clients passes the given data >>> to a single background worker thread. This would avoid a lot lof the >>> footguns of the current model and probably better reflect how people >>> actually use the client. >>> >>> >>> >>> Another issue is that neither the producer nor the consumer is fully >>> nonblocking. There are some corner cases where we do in fact block. From >>> memory, the producer blocks in some "buffer full" cases, and the consumer >>> blocks sometimes when fetching metadata. >>> >>> >>> >>> I suspect it would be more appropriate for Kotlin coroutines, Zio >>> coroutines and so on to adopt this "pass messages to and from a background >>> worker thread" model than to try to re-engineer the Kafka client ot work >>> from random threads. >>> >>> >>> >>> There is actually somed good advice about how to handle multiple >>> threads in the KafkaConsumer.java header file itself. Check the sections >>> "One Consumer Per Thread" and "Decouple Consumption and Processing." What >>> I'm recommending here is essentially the latter. >>> >>> >>> >>> I do understand that it's frustrating to not get a quick response. >>> However, overall I think this one needs a lot more discussion before >>> getting anywhere near a vote. I will leave a -1 just as a procedural step. >>> Maybe some of the people working in the client area can also chime in. >>> >>> >>> >>> best, >>> >>> Colin >>> >>> >>> >>> >>> >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: >>> >>>> Dear PMCs, >>> >>>> >>> >>>> So far there have been 0 responses to KIP-944. I understand this may >>> not >>> >>>> be something that keeps you busy, but this KIP is important to people >>> >>>> that use async runtimes like Zio, Cats and Kotlin. >>> >>>> >>> >>>> Is there anything you need to come to a decision? >>> >>>> >>> >>>> Kind regards, >>> >>>> Erik. >>> >>>> >>> >>>> >>> >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten: >>> >>>>> Hello all, >>> >>>>> >>> >>>>> I'd like to call a vote on KIP-944 Support async runtimes in >>> consumer. >>> >>>>> It has has been 'under discussion' for 7 days now. 'Under >>> discussion' >>> >>>>> between quotes, because there were 0 comments so far. I hope the KIP >>> >>>>> is clear! >>> >>>>> >>> >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw >>> >>>>> >>> >>>>> Kind regards, >>> >>>>> Erik. >>> >>>>> >>> >>>>> >>> >> -- >>> >> Erik van Oosten >>> >> e.vanoos...@grons.nl >>> >> https://day-to-day-stuff.blogspot.com >>> >>> -- >>> Erik van Oosten >>> e.vanoos...@grons.nl >>> https://day-to-day-stuff.blogspot.com >>> >>>