Hey Erik - Another thing I want to add to my comment is.  We are in-process
of re-writing the KafkaConsumer, and I think your proposal would work in
the new consumer because we are going to separate the user thread and the
background thread.  Here is the 1-pager, and we are in process of
converting this in to KIP-945.

Thanks,
P

On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote:

> Hey Erik,
>
> Sorry for holding up this email for a few days since Colin's response
> includes some of my concerns.  I'm in favor of this KIP, and I think your
> approach seems safe.  Of course, I probably missed something therefore I
> think this KIP needs to cover different use cases to demonstrate it doesn't
> cause any unsafe access. I think this can be demonstrated via diagrams and
> some code in the KIP.
>
> Thanks,
> P
>
> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
> <e.vanoos...@grons.nl.invalid> wrote:
>
>> Hello Colin,
>>
>>  >> In KIP-944, the callback thread can only delegate to another thread
>> after reading from and writing to a threadlocal variable, providing the
>> barriers right there.
>>
>>  > I don't see any documentation that accessing thread local variables
>> provides a total store or load barrier. Do you have such documentation?
>> It seems like if this were the case, we could eliminate volatile
>> variables from most of the code base.
>>
>> Now I was imprecise. The thread-locals are only somewhat involved. In
>> the KIP proposal the callback thread reads an access key from a
>> thread-local variable. It then needs to pass that access key to another
>> thread, which then can set it on its own thread-local variable. The act
>> of passing a value from one thread to another implies that a memory
>> barrier needs to be passed. However, this is all not so relevant since
>> there is no need to pass the access key back when the other thread is
>> done.
>>
>> But now I think about it a bit more, the locking mechanism runs in a
>> synchronized block. If I remember correctly this should be enough to
>> pass read and write barriers.
>>
>>  >> In the current implementation the consumer is also invoked from
>> random threads. If it works now, it should continue to work.
>>  > I'm not sure what you're referring to. Can you expand on this?
>>
>> Any invocation of the consumer (e.g. method poll) is not from a thread
>> managed by the consumer. This is what I was assuming you meant with the
>> term 'random thread'.
>>
>>  > Hmm, not sure what you mean by "cooperate with blocking code." If you
>> have 10 green threads you're multiplexing on to one CPU thread, and that
>> CPU thread gets blocked because of what one green thread is doing, the
>> other 9 green threads are blocked too, right? I guess it's "just" a
>> performance problem, but it still seems like it could be a serious one.
>>
>> There are several ways to deal with this. All async runtimes I know
>> (Akka, Zio, Cats-effects) support this by letting you mark a task as
>> blocking. The runtime will then either schedule it to another
>> thread-pool, or it will grow the thread-pool to accommodate. In any case
>> 'the other 9 green threads' will simply be scheduled to another real
>> thread. In addition, some of these runtimes detect long running tasks
>> and will reschedule waiting tasks to another thread. This is all a bit
>> off topic though.
>>
>>  > I don't see why this has to be "inherently multi-threaded." Why can't
>> we have the other threads report back what messages they've processed to
>> the worker thread. Then it will be able to handle these callbacks
>> without involving the other threads.
>>
>> Please consider the context which is that we are running inside the
>> callback of the rebalance listener. The only way to execute something
>> and also have a timeout on it is to run the something on another thread.
>>
>> Kind regards,
>>      Erik.
>>
>>
>> Op 08-07-2023 om 19:17 schreef Colin McCabe:
>> > On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
>> >> Hi Colin,
>> >>
>> >> Thanks for your thoughts and taking the time to reply.
>> >>
>> >> Let me take away your concerns. None of your worries are an issue with
>> >> the algorithm described in KIP-944. Here it goes:
>> >>
>> >>   > It's not clear ot me that it's safe to access the Kafka consumer or
>> >>> producer concurrently from different threads.
>> >> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
>> >> through great lengths to make sure that this cannot happen.
>> >>
>> >> *The only design goal is to allow callbacks to call the consumer from
>> >> another thread.*
>> >>
>> >> To make sure there are no more misunderstandings about this, I have
>> >> added this goal to the KIP.
>> >>
>> > Hi Erik,
>> >
>> > Sorry, I spoke imprecisely. My concern is not concurrent access, but
>> multithreaded access in general. Basically cache line visibility issues.
>> >
>> >>   > This is true even if the accesses happen at different times,
>> because
>> >>> modern CPUs require memory barriers to guarantee inter-thread
>> visibilty
>> >>> of loads and stores.
>> >> In KIP-944, the callback thread can only delegate to another thread
>> >> after reading from and writing to a threadlocal variable, providing the
>> >> barriers right there.
>> >>
>> > I don't see any documentation that accessing thread local variables
>> provides a total store or load barrier. Do you have such documentation? It
>> seems like if this were the case, we could eliminate volatile variables
>> from most of the code base.
>> >
>> >>   > I know that there are at least a few locks in the consumer code
>> now,
>> >>> due to our need to send heartbeats from a worker thread. I don't think
>> >>> those would be sufficient to protect a client that is making calls
>> from
>> >>> random threads.
>> >> In the current implementation the consumer is also invoked from random
>> >> threads. If it works now, it should continue to work.
>> >>
>> > I'm not sure what you're referring to. Can you expand on this?
>> >
>> >>   > There has been some discussion of moving to a more traditional
>> model
>> >>> where people make calls to the client and the clients passes the given
>> >>> data to a single background worker thread. This would avoid a lot lof
>> >>> the footguns of the current model and probably better reflect how
>> people
>> >>> actually use the client.
>> >> That is awesome. However, I'd rather not wait for that.
>> >>
>> >>   > Another issue is that neither the producer nor the consumer is
>> fully
>> >>> nonblocking. There are some corner cases where we do in fact block.
>> From
>> >>> memory, the producer blocks in some "buffer full" cases, and the
>> >>> consumer blocks sometimes when fetching metadata.
>> >> I am aware of that. This is not an issue; all async runtimes can
>> >> cooperate with blocking code.
>> >>
>> > Hmm, not sure what you mean by "cooperate with blocking code." If you
>> have 10 green threads you're multiplexing on to one CPU thread, and that
>> CPU thread gets blocked because of what one green thread is doing, the
>> other 9 green threads are blocked too, right? I guess it's "just" a
>> performance problem, but it still seems like it could be a serious one.
>> >
>> >>   > I suspect it would be more appropriate for Kotlin coroutines, Zio
>> >>> coroutines and so on to adopt this "pass messages to and from a
>> >>> background worker thread" model than to try to re-engineer the Kafka
>> >>> client ot work from random threads.
>> >> In both zio-kafka and fs2-kafka this is already the approach we are
>> taking.
>> >>
>> >> Unfortunately, the Kafka consumer forces us to perform some work in
>> >> callbacks:
>> >>
>> >>    * commit completed callback: register that the callback is complete,
>> >>    * partition revoked callback: in this callback we need to submit
>> >>      commits from everything consumed and processed so far, using
>> >>      timeouts if processing takes to long. In an async runtime, this is
>> >>      an inherently multi-threaded process. Especially, we cannot do
>> >>      timeouts without involving multiple threads.
>> >>
>> > I don't see why this has to be "inherently multi-threaded." Why can't
>> we have the other threads report back what messages they've processed to
>> the worker thread. Then it will be able to handle these callbacks without
>> involving the other threads.
>> >
>> > regards,
>> > Colin
>> >
>> >> I have extended the KIP's motivation to explain the major use case.
>> >>
>> >> Please read KIP-944 again. Even though the description is extensive
>> >> (this callback from callback stuff is tricky), you will find that my
>> >> goals are modest.
>> >>
>> >> Also the implementation is just a few lines. With understanding of the
>> >> idea it should not be a lot of work to follow it.
>> >>
>> >> Kind regards,
>> >>       Erik.
>> >>
>> >>
>> >> Op 07-07-2023 om 19:57 schreef Colin McCabe:
>> >>> Hi Erik,
>> >>>
>> >>> It's not clear ot me that it's safe to access the Kafka consumer or
>> producer concurrently from different threads. There are data structures
>> that aren't protected by locks, so I wouldn't necessarily expect accessing
>> and mutating them in a concurrent way to work. This is true even if the
>> accesses happen at different times, because modern CPUs require memory
>> barriers to guarantee inter-thread visibilty of loads and stores.
>> >>>
>> >>> I am writing this is without doing a detailed dive into the code (I
>> haven't been into the consumer / producer code in a bit.) Someone who has
>> worked more on the consumer recently might be able to give specific
>> examples of things that wouldn't work.
>> >>>
>> >>> I know that there are at least a few locks in the consumer code now,
>> due to our need to send heartbeats from a worker thread. I don't think
>> those would be sufficient to protect a client that is making calls from
>> random threads.
>> >>>
>> >>> There has been some discussion of moving to a more traditional model
>> where people make calls to the client and the clients passes the given data
>> to a single background worker thread. This would avoid a lot lof the
>> footguns of the current model and probably better reflect how people
>> actually use the client.
>> >>>
>> >>> Another issue is that neither the producer nor the consumer is fully
>> nonblocking. There are some corner cases where we do in fact block. From
>> memory, the producer blocks in some "buffer full" cases, and the consumer
>> blocks sometimes when fetching metadata.
>> >>>
>> >>> I suspect it would be more appropriate for Kotlin coroutines, Zio
>> coroutines and so on to adopt this "pass messages to and from a background
>> worker thread" model  than to try to re-engineer the Kafka client ot work
>> from random threads.
>> >>>
>> >>> There is actually somed good  advice about how to handle multiple
>> threads in the KafkaConsumer.java header file itself. Check the sections
>> "One Consumer Per Thread" and "Decouple Consumption and Processing." What
>> I'm recommending here is essentially the latter.
>> >>>
>> >>> I do understand that it's frustrating to not get a quick response.
>> However, overall I think this one needs a lot more discussion before
>> getting anywhere near a vote. I will leave a -1 just as a procedural step.
>> Maybe some of the people working in the client area can also chime in.
>> >>>
>> >>> best,
>> >>> Colin
>> >>>
>> >>>
>> >>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
>> >>>> Dear PMCs,
>> >>>>
>> >>>> So far there have been 0 responses to KIP-944. I understand this may
>> not
>> >>>> be something that keeps you busy, but this KIP is important to people
>> >>>> that use async runtimes like Zio, Cats and Kotlin.
>> >>>>
>> >>>> Is there anything you need to come to a decision?
>> >>>>
>> >>>> Kind regards,
>> >>>>        Erik.
>> >>>>
>> >>>>
>> >>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten:
>> >>>>> Hello all,
>> >>>>>
>> >>>>> I'd like to call a vote on KIP-944 Support async runtimes in
>> consumer.
>> >>>>> It has has been 'under discussion' for 7 days now. 'Under
>> discussion'
>> >>>>> between quotes, because there were 0 comments so far. I hope the KIP
>> >>>>> is clear!
>> >>>>>
>> >>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw
>> >>>>>
>> >>>>> Kind regards,
>> >>>>>       Erik.
>> >>>>>
>> >>>>>
>> >> --
>> >> Erik van Oosten
>> >> e.vanoos...@grons.nl
>> >> https://day-to-day-stuff.blogspot.com
>>
>> --
>> Erik van Oosten
>> e.vanoos...@grons.nl
>> https://day-to-day-stuff.blogspot.com
>>
>>

Reply via email to