HI Philip & Erik,

Hmm... if we agree that KIP-945 addresses this use case, I think it would be 
better to just focus on that KIP. Fundamentally it's a better and cleaner model 
than a complex scheme involving thread-local variables. I really don't want to 
be debugging complex interactions between Java thread-local variables and green 
threads.

It also generally helps to have some use-cases in mind when writing these 
things. If we get feedback about what would be useful for async runtimes, that 
would probably help improve and focus KIP-945. By the way, I can see you have a 
draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I 
assume it's not ready for review yet ;)

best,
Colin


On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote:
> Hey Erik - Another thing I want to add to my comment is.  We are in-process
> of re-writing the KafkaConsumer, and I think your proposal would work in
> the new consumer because we are going to separate the user thread and the
> background thread.  Here is the 1-pager, and we are in process of
> converting this in to KIP-945.
>
> Thanks,
> P
>
> On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote:
>
>> Hey Erik,
>>
>> Sorry for holding up this email for a few days since Colin's response
>> includes some of my concerns.  I'm in favor of this KIP, and I think your
>> approach seems safe.  Of course, I probably missed something therefore I
>> think this KIP needs to cover different use cases to demonstrate it doesn't
>> cause any unsafe access. I think this can be demonstrated via diagrams and
>> some code in the KIP.
>>
>> Thanks,
>> P
>>
>> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
>> <e.vanoos...@grons.nl.invalid> wrote:
>>
>>> Hello Colin,
>>>
>>>  >> In KIP-944, the callback thread can only delegate to another thread
>>> after reading from and writing to a threadlocal variable, providing the
>>> barriers right there.
>>>
>>>  > I don't see any documentation that accessing thread local variables
>>> provides a total store or load barrier. Do you have such documentation?
>>> It seems like if this were the case, we could eliminate volatile
>>> variables from most of the code base.
>>>
>>> Now I was imprecise. The thread-locals are only somewhat involved. In
>>> the KIP proposal the callback thread reads an access key from a
>>> thread-local variable. It then needs to pass that access key to another
>>> thread, which then can set it on its own thread-local variable. The act
>>> of passing a value from one thread to another implies that a memory
>>> barrier needs to be passed. However, this is all not so relevant since
>>> there is no need to pass the access key back when the other thread is
>>> done.
>>>
>>> But now I think about it a bit more, the locking mechanism runs in a
>>> synchronized block. If I remember correctly this should be enough to
>>> pass read and write barriers.
>>>
>>>  >> In the current implementation the consumer is also invoked from
>>> random threads. If it works now, it should continue to work.
>>>  > I'm not sure what you're referring to. Can you expand on this?
>>>
>>> Any invocation of the consumer (e.g. method poll) is not from a thread
>>> managed by the consumer. This is what I was assuming you meant with the
>>> term 'random thread'.
>>>
>>>  > Hmm, not sure what you mean by "cooperate with blocking code." If you
>>> have 10 green threads you're multiplexing on to one CPU thread, and that
>>> CPU thread gets blocked because of what one green thread is doing, the
>>> other 9 green threads are blocked too, right? I guess it's "just" a
>>> performance problem, but it still seems like it could be a serious one.
>>>
>>> There are several ways to deal with this. All async runtimes I know
>>> (Akka, Zio, Cats-effects) support this by letting you mark a task as
>>> blocking. The runtime will then either schedule it to another
>>> thread-pool, or it will grow the thread-pool to accommodate. In any case
>>> 'the other 9 green threads' will simply be scheduled to another real
>>> thread. In addition, some of these runtimes detect long running tasks
>>> and will reschedule waiting tasks to another thread. This is all a bit
>>> off topic though.
>>>
>>>  > I don't see why this has to be "inherently multi-threaded." Why can't
>>> we have the other threads report back what messages they've processed to
>>> the worker thread. Then it will be able to handle these callbacks
>>> without involving the other threads.
>>>
>>> Please consider the context which is that we are running inside the
>>> callback of the rebalance listener. The only way to execute something
>>> and also have a timeout on it is to run the something on another thread.
>>>
>>> Kind regards,
>>>      Erik.
>>>
>>>
>>> Op 08-07-2023 om 19:17 schreef Colin McCabe:
>>> > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
>>> >> Hi Colin,
>>> >>
>>> >> Thanks for your thoughts and taking the time to reply.
>>> >>
>>> >> Let me take away your concerns. None of your worries are an issue with
>>> >> the algorithm described in KIP-944. Here it goes:
>>> >>
>>> >>   > It's not clear ot me that it's safe to access the Kafka consumer or
>>> >>> producer concurrently from different threads.
>>> >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
>>> >> through great lengths to make sure that this cannot happen.
>>> >>
>>> >> *The only design goal is to allow callbacks to call the consumer from
>>> >> another thread.*
>>> >>
>>> >> To make sure there are no more misunderstandings about this, I have
>>> >> added this goal to the KIP.
>>> >>
>>> > Hi Erik,
>>> >
>>> > Sorry, I spoke imprecisely. My concern is not concurrent access, but
>>> multithreaded access in general. Basically cache line visibility issues.
>>> >
>>> >>   > This is true even if the accesses happen at different times,
>>> because
>>> >>> modern CPUs require memory barriers to guarantee inter-thread
>>> visibilty
>>> >>> of loads and stores.
>>> >> In KIP-944, the callback thread can only delegate to another thread
>>> >> after reading from and writing to a threadlocal variable, providing the
>>> >> barriers right there.
>>> >>
>>> > I don't see any documentation that accessing thread local variables
>>> provides a total store or load barrier. Do you have such documentation? It
>>> seems like if this were the case, we could eliminate volatile variables
>>> from most of the code base.
>>> >
>>> >>   > I know that there are at least a few locks in the consumer code
>>> now,
>>> >>> due to our need to send heartbeats from a worker thread. I don't think
>>> >>> those would be sufficient to protect a client that is making calls
>>> from
>>> >>> random threads.
>>> >> In the current implementation the consumer is also invoked from random
>>> >> threads. If it works now, it should continue to work.
>>> >>
>>> > I'm not sure what you're referring to. Can you expand on this?
>>> >
>>> >>   > There has been some discussion of moving to a more traditional
>>> model
>>> >>> where people make calls to the client and the clients passes the given
>>> >>> data to a single background worker thread. This would avoid a lot lof
>>> >>> the footguns of the current model and probably better reflect how
>>> people
>>> >>> actually use the client.
>>> >> That is awesome. However, I'd rather not wait for that.
>>> >>
>>> >>   > Another issue is that neither the producer nor the consumer is
>>> fully
>>> >>> nonblocking. There are some corner cases where we do in fact block.
>>> From
>>> >>> memory, the producer blocks in some "buffer full" cases, and the
>>> >>> consumer blocks sometimes when fetching metadata.
>>> >> I am aware of that. This is not an issue; all async runtimes can
>>> >> cooperate with blocking code.
>>> >>
>>> > Hmm, not sure what you mean by "cooperate with blocking code." If you
>>> have 10 green threads you're multiplexing on to one CPU thread, and that
>>> CPU thread gets blocked because of what one green thread is doing, the
>>> other 9 green threads are blocked too, right? I guess it's "just" a
>>> performance problem, but it still seems like it could be a serious one.
>>> >
>>> >>   > I suspect it would be more appropriate for Kotlin coroutines, Zio
>>> >>> coroutines and so on to adopt this "pass messages to and from a
>>> >>> background worker thread" model than to try to re-engineer the Kafka
>>> >>> client ot work from random threads.
>>> >> In both zio-kafka and fs2-kafka this is already the approach we are
>>> taking.
>>> >>
>>> >> Unfortunately, the Kafka consumer forces us to perform some work in
>>> >> callbacks:
>>> >>
>>> >>    * commit completed callback: register that the callback is complete,
>>> >>    * partition revoked callback: in this callback we need to submit
>>> >>      commits from everything consumed and processed so far, using
>>> >>      timeouts if processing takes to long. In an async runtime, this is
>>> >>      an inherently multi-threaded process. Especially, we cannot do
>>> >>      timeouts without involving multiple threads.
>>> >>
>>> > I don't see why this has to be "inherently multi-threaded." Why can't
>>> we have the other threads report back what messages they've processed to
>>> the worker thread. Then it will be able to handle these callbacks without
>>> involving the other threads.
>>> >
>>> > regards,
>>> > Colin
>>> >
>>> >> I have extended the KIP's motivation to explain the major use case.
>>> >>
>>> >> Please read KIP-944 again. Even though the description is extensive
>>> >> (this callback from callback stuff is tricky), you will find that my
>>> >> goals are modest.
>>> >>
>>> >> Also the implementation is just a few lines. With understanding of the
>>> >> idea it should not be a lot of work to follow it.
>>> >>
>>> >> Kind regards,
>>> >>       Erik.
>>> >>
>>> >>
>>> >> Op 07-07-2023 om 19:57 schreef Colin McCabe:
>>> >>> Hi Erik,
>>> >>>
>>> >>> It's not clear ot me that it's safe to access the Kafka consumer or
>>> producer concurrently from different threads. There are data structures
>>> that aren't protected by locks, so I wouldn't necessarily expect accessing
>>> and mutating them in a concurrent way to work. This is true even if the
>>> accesses happen at different times, because modern CPUs require memory
>>> barriers to guarantee inter-thread visibilty of loads and stores.
>>> >>>
>>> >>> I am writing this is without doing a detailed dive into the code (I
>>> haven't been into the consumer / producer code in a bit.) Someone who has
>>> worked more on the consumer recently might be able to give specific
>>> examples of things that wouldn't work.
>>> >>>
>>> >>> I know that there are at least a few locks in the consumer code now,
>>> due to our need to send heartbeats from a worker thread. I don't think
>>> those would be sufficient to protect a client that is making calls from
>>> random threads.
>>> >>>
>>> >>> There has been some discussion of moving to a more traditional model
>>> where people make calls to the client and the clients passes the given data
>>> to a single background worker thread. This would avoid a lot lof the
>>> footguns of the current model and probably better reflect how people
>>> actually use the client.
>>> >>>
>>> >>> Another issue is that neither the producer nor the consumer is fully
>>> nonblocking. There are some corner cases where we do in fact block. From
>>> memory, the producer blocks in some "buffer full" cases, and the consumer
>>> blocks sometimes when fetching metadata.
>>> >>>
>>> >>> I suspect it would be more appropriate for Kotlin coroutines, Zio
>>> coroutines and so on to adopt this "pass messages to and from a background
>>> worker thread" model  than to try to re-engineer the Kafka client ot work
>>> from random threads.
>>> >>>
>>> >>> There is actually somed good  advice about how to handle multiple
>>> threads in the KafkaConsumer.java header file itself. Check the sections
>>> "One Consumer Per Thread" and "Decouple Consumption and Processing." What
>>> I'm recommending here is essentially the latter.
>>> >>>
>>> >>> I do understand that it's frustrating to not get a quick response.
>>> However, overall I think this one needs a lot more discussion before
>>> getting anywhere near a vote. I will leave a -1 just as a procedural step.
>>> Maybe some of the people working in the client area can also chime in.
>>> >>>
>>> >>> best,
>>> >>> Colin
>>> >>>
>>> >>>
>>> >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
>>> >>>> Dear PMCs,
>>> >>>>
>>> >>>> So far there have been 0 responses to KIP-944. I understand this may
>>> not
>>> >>>> be something that keeps you busy, but this KIP is important to people
>>> >>>> that use async runtimes like Zio, Cats and Kotlin.
>>> >>>>
>>> >>>> Is there anything you need to come to a decision?
>>> >>>>
>>> >>>> Kind regards,
>>> >>>>        Erik.
>>> >>>>
>>> >>>>
>>> >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten:
>>> >>>>> Hello all,
>>> >>>>>
>>> >>>>> I'd like to call a vote on KIP-944 Support async runtimes in
>>> consumer.
>>> >>>>> It has has been 'under discussion' for 7 days now. 'Under
>>> discussion'
>>> >>>>> between quotes, because there were 0 comments so far. I hope the KIP
>>> >>>>> is clear!
>>> >>>>>
>>> >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw
>>> >>>>>
>>> >>>>> Kind regards,
>>> >>>>>       Erik.
>>> >>>>>
>>> >>>>>
>>> >> --
>>> >> Erik van Oosten
>>> >> e.vanoos...@grons.nl
>>> >> https://day-to-day-stuff.blogspot.com
>>>
>>> --
>>> Erik van Oosten
>>> e.vanoos...@grons.nl
>>> https://day-to-day-stuff.blogspot.com
>>>
>>>

Reply via email to