Hi Erik,

Thanks for the KIP!

I empathize with your frustration over the radio silence. It gets like that 
sometimes, and I apologize for my lack of feedback.

I’d personally like to see this lively exchange move over to the DISCUSS thread 
you’d created before.

Thanks,
Kirk

> On Jul 14, 2023, at 1:33 AM, Erik van Oosten <e.vanoos...@grons.nl.INVALID> 
> wrote:
> 
> Hi Colin,
> 
> The way I understood Philp's message is that KIP-944 also plays nice with 
> KIP-945. But I might be mistaken.
> 
> Regardless, KIP-945 does /not/ resolve the underlying problem (the need for 
> nested consumer invocations) because it has the explicit goal of not changing 
> the user facing API.
> 
> > ... KIP-945 but haven't posted a DISCUSS thread yet
> 
> There is a thread called 'KafkaConsumer refactor proposal', but indeed no 
> official discussion yet.
> 
> > I really don't want to be debugging complex interactions between Java 
> > thread-local variables and green threads.
> 
> In that email thread, I proposed an API change in which callbacks are no 
> longer needed. The proposal completely removes the need for such complex 
> interactions. In addition, it gives clients the ability to process at full 
> speed even while a coorperative rebalance is ongoing.
> 
> Regards,
>     Erik.
> 
> Op 14-07-2023 om 00:36 schreef Colin McCabe:
>> HI Philip & Erik,
>> 
>> Hmm... if we agree that KIP-945 addresses this use case, I think it would be 
>> better to just focus on that KIP. Fundamentally it's a better and cleaner 
>> model than a complex scheme involving thread-local variables. I really don't 
>> want to be debugging complex interactions between Java thread-local 
>> variables and green threads.
>> 
>> It also generally helps to have some use-cases in mind when writing these 
>> things. If we get feedback about what would be useful for async runtimes, 
>> that would probably help improve and focus KIP-945. By the way, I can see 
>> you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread 
>> yet, so I assume it's not ready for review yet ;)
>> 
>> best,
>> Colin
>> 
>> 
>> On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote:
>>> Hey Erik - Another thing I want to add to my comment is.  We are in-process
>>> of re-writing the KafkaConsumer, and I think your proposal would work in
>>> the new consumer because we are going to separate the user thread and the
>>> background thread.  Here is the 1-pager, and we are in process of
>>> converting this in to KIP-945.
>>> 
>>> Thanks,
>>> P
>>> 
>>> On Tue, Jul 11, 2023 at 10:33 AM Philip Nee <philip...@gmail.com> wrote:
>>> 
>>>> Hey Erik,
>>>> 
>>>> Sorry for holding up this email for a few days since Colin's response
>>>> includes some of my concerns.  I'm in favor of this KIP, and I think your
>>>> approach seems safe.  Of course, I probably missed something therefore I
>>>> think this KIP needs to cover different use cases to demonstrate it doesn't
>>>> cause any unsafe access. I think this can be demonstrated via diagrams and
>>>> some code in the KIP.
>>>> 
>>>> Thanks,
>>>> P
>>>> 
>>>> On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
>>>> <e.vanoos...@grons.nl.invalid> wrote:
>>>> 
>>>>> Hello Colin,
>>>>> 
>>>>>  >> In KIP-944, the callback thread can only delegate to another thread
>>>>> after reading from and writing to a threadlocal variable, providing the
>>>>> barriers right there.
>>>>> 
>>>>>  > I don't see any documentation that accessing thread local variables
>>>>> provides a total store or load barrier. Do you have such documentation?
>>>>> It seems like if this were the case, we could eliminate volatile
>>>>> variables from most of the code base.
>>>>> 
>>>>> Now I was imprecise. The thread-locals are only somewhat involved. In
>>>>> the KIP proposal the callback thread reads an access key from a
>>>>> thread-local variable. It then needs to pass that access key to another
>>>>> thread, which then can set it on its own thread-local variable. The act
>>>>> of passing a value from one thread to another implies that a memory
>>>>> barrier needs to be passed. However, this is all not so relevant since
>>>>> there is no need to pass the access key back when the other thread is
>>>>> done.
>>>>> 
>>>>> But now I think about it a bit more, the locking mechanism runs in a
>>>>> synchronized block. If I remember correctly this should be enough to
>>>>> pass read and write barriers.
>>>>> 
>>>>>  >> In the current implementation the consumer is also invoked from
>>>>> random threads. If it works now, it should continue to work.
>>>>>  > I'm not sure what you're referring to. Can you expand on this?
>>>>> 
>>>>> Any invocation of the consumer (e.g. method poll) is not from a thread
>>>>> managed by the consumer. This is what I was assuming you meant with the
>>>>> term 'random thread'.
>>>>> 
>>>>>  > Hmm, not sure what you mean by "cooperate with blocking code." If you
>>>>> have 10 green threads you're multiplexing on to one CPU thread, and that
>>>>> CPU thread gets blocked because of what one green thread is doing, the
>>>>> other 9 green threads are blocked too, right? I guess it's "just" a
>>>>> performance problem, but it still seems like it could be a serious one.
>>>>> 
>>>>> There are several ways to deal with this. All async runtimes I know
>>>>> (Akka, Zio, Cats-effects) support this by letting you mark a task as
>>>>> blocking. The runtime will then either schedule it to another
>>>>> thread-pool, or it will grow the thread-pool to accommodate. In any case
>>>>> 'the other 9 green threads' will simply be scheduled to another real
>>>>> thread. In addition, some of these runtimes detect long running tasks
>>>>> and will reschedule waiting tasks to another thread. This is all a bit
>>>>> off topic though.
>>>>> 
>>>>>  > I don't see why this has to be "inherently multi-threaded." Why can't
>>>>> we have the other threads report back what messages they've processed to
>>>>> the worker thread. Then it will be able to handle these callbacks
>>>>> without involving the other threads.
>>>>> 
>>>>> Please consider the context which is that we are running inside the
>>>>> callback of the rebalance listener. The only way to execute something
>>>>> and also have a timeout on it is to run the something on another thread.
>>>>> 
>>>>> Kind regards,
>>>>>      Erik.
>>>>> 
>>>>> 
>>>>> Op 08-07-2023 om 19:17 schreef Colin McCabe:
>>>>>> On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
>>>>>>> Hi Colin,
>>>>>>> 
>>>>>>> Thanks for your thoughts and taking the time to reply.
>>>>>>> 
>>>>>>> Let me take away your concerns. None of your worries are an issue with
>>>>>>> the algorithm described in KIP-944. Here it goes:
>>>>>>> 
>>>>>>>   > It's not clear ot me that it's safe to access the Kafka consumer or
>>>>>>>> producer concurrently from different threads.
>>>>>>> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
>>>>>>> through great lengths to make sure that this cannot happen.
>>>>>>> 
>>>>>>> *The only design goal is to allow callbacks to call the consumer from
>>>>>>> another thread.*
>>>>>>> 
>>>>>>> To make sure there are no more misunderstandings about this, I have
>>>>>>> added this goal to the KIP.
>>>>>>> 
>>>>>> Hi Erik,
>>>>>> 
>>>>>> Sorry, I spoke imprecisely. My concern is not concurrent access, but
>>>>> multithreaded access in general. Basically cache line visibility issues.
>>>>>>>   > This is true even if the accesses happen at different times,
>>>>> because
>>>>>>>> modern CPUs require memory barriers to guarantee inter-thread
>>>>> visibilty
>>>>>>>> of loads and stores.
>>>>>>> In KIP-944, the callback thread can only delegate to another thread
>>>>>>> after reading from and writing to a threadlocal variable, providing the
>>>>>>> barriers right there.
>>>>>>> 
>>>>>> I don't see any documentation that accessing thread local variables
>>>>> provides a total store or load barrier. Do you have such documentation? It
>>>>> seems like if this were the case, we could eliminate volatile variables
>>>>> from most of the code base.
>>>>>>>   > I know that there are at least a few locks in the consumer code
>>>>> now,
>>>>>>>> due to our need to send heartbeats from a worker thread. I don't think
>>>>>>>> those would be sufficient to protect a client that is making calls
>>>>> from
>>>>>>>> random threads.
>>>>>>> In the current implementation the consumer is also invoked from random
>>>>>>> threads. If it works now, it should continue to work.
>>>>>>> 
>>>>>> I'm not sure what you're referring to. Can you expand on this?
>>>>>> 
>>>>>>>   > There has been some discussion of moving to a more traditional
>>>>> model
>>>>>>>> where people make calls to the client and the clients passes the given
>>>>>>>> data to a single background worker thread. This would avoid a lot lof
>>>>>>>> the footguns of the current model and probably better reflect how
>>>>> people
>>>>>>>> actually use the client.
>>>>>>> That is awesome. However, I'd rather not wait for that.
>>>>>>> 
>>>>>>>   > Another issue is that neither the producer nor the consumer is
>>>>> fully
>>>>>>>> nonblocking. There are some corner cases where we do in fact block.
>>>>> From
>>>>>>>> memory, the producer blocks in some "buffer full" cases, and the
>>>>>>>> consumer blocks sometimes when fetching metadata.
>>>>>>> I am aware of that. This is not an issue; all async runtimes can
>>>>>>> cooperate with blocking code.
>>>>>>> 
>>>>>> Hmm, not sure what you mean by "cooperate with blocking code." If you
>>>>> have 10 green threads you're multiplexing on to one CPU thread, and that
>>>>> CPU thread gets blocked because of what one green thread is doing, the
>>>>> other 9 green threads are blocked too, right? I guess it's "just" a
>>>>> performance problem, but it still seems like it could be a serious one.
>>>>>>>   > I suspect it would be more appropriate for Kotlin coroutines, Zio
>>>>>>>> coroutines and so on to adopt this "pass messages to and from a
>>>>>>>> background worker thread" model than to try to re-engineer the Kafka
>>>>>>>> client ot work from random threads.
>>>>>>> In both zio-kafka and fs2-kafka this is already the approach we are
>>>>> taking.
>>>>>>> Unfortunately, the Kafka consumer forces us to perform some work in
>>>>>>> callbacks:
>>>>>>> 
>>>>>>>    * commit completed callback: register that the callback is complete,
>>>>>>>    * partition revoked callback: in this callback we need to submit
>>>>>>>      commits from everything consumed and processed so far, using
>>>>>>>      timeouts if processing takes to long. In an async runtime, this is
>>>>>>>      an inherently multi-threaded process. Especially, we cannot do
>>>>>>>      timeouts without involving multiple threads.
>>>>>>> 
>>>>>> I don't see why this has to be "inherently multi-threaded." Why can't
>>>>> we have the other threads report back what messages they've processed to
>>>>> the worker thread. Then it will be able to handle these callbacks without
>>>>> involving the other threads.
>>>>>> regards,
>>>>>> Colin
>>>>>> 
>>>>>>> I have extended the KIP's motivation to explain the major use case.
>>>>>>> 
>>>>>>> Please read KIP-944 again. Even though the description is extensive
>>>>>>> (this callback from callback stuff is tricky), you will find that my
>>>>>>> goals are modest.
>>>>>>> 
>>>>>>> Also the implementation is just a few lines. With understanding of the
>>>>>>> idea it should not be a lot of work to follow it.
>>>>>>> 
>>>>>>> Kind regards,
>>>>>>>       Erik.
>>>>>>> 
>>>>>>> 
>>>>>>> Op 07-07-2023 om 19:57 schreef Colin McCabe:
>>>>>>>> Hi Erik,
>>>>>>>> 
>>>>>>>> It's not clear ot me that it's safe to access the Kafka consumer or
>>>>> producer concurrently from different threads. There are data structures
>>>>> that aren't protected by locks, so I wouldn't necessarily expect accessing
>>>>> and mutating them in a concurrent way to work. This is true even if the
>>>>> accesses happen at different times, because modern CPUs require memory
>>>>> barriers to guarantee inter-thread visibilty of loads and stores.
>>>>>>>> I am writing this is without doing a detailed dive into the code (I
>>>>> haven't been into the consumer / producer code in a bit.) Someone who has
>>>>> worked more on the consumer recently might be able to give specific
>>>>> examples of things that wouldn't work.
>>>>>>>> I know that there are at least a few locks in the consumer code now,
>>>>> due to our need to send heartbeats from a worker thread. I don't think
>>>>> those would be sufficient to protect a client that is making calls from
>>>>> random threads.
>>>>>>>> There has been some discussion of moving to a more traditional model
>>>>> where people make calls to the client and the clients passes the given 
>>>>> data
>>>>> to a single background worker thread. This would avoid a lot lof the
>>>>> footguns of the current model and probably better reflect how people
>>>>> actually use the client.
>>>>>>>> Another issue is that neither the producer nor the consumer is fully
>>>>> nonblocking. There are some corner cases where we do in fact block. From
>>>>> memory, the producer blocks in some "buffer full" cases, and the consumer
>>>>> blocks sometimes when fetching metadata.
>>>>>>>> I suspect it would be more appropriate for Kotlin coroutines, Zio
>>>>> coroutines and so on to adopt this "pass messages to and from a background
>>>>> worker thread" model  than to try to re-engineer the Kafka client ot work
>>>>> from random threads.
>>>>>>>> There is actually somed good  advice about how to handle multiple
>>>>> threads in the KafkaConsumer.java header file itself. Check the sections
>>>>> "One Consumer Per Thread" and "Decouple Consumption and Processing." What
>>>>> I'm recommending here is essentially the latter.
>>>>>>>> I do understand that it's frustrating to not get a quick response.
>>>>> However, overall I think this one needs a lot more discussion before
>>>>> getting anywhere near a vote. I will leave a -1 just as a procedural step.
>>>>> Maybe some of the people working in the client area can also chime in.
>>>>>>>> best,
>>>>>>>> Colin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
>>>>>>>>> Dear PMCs,
>>>>>>>>> 
>>>>>>>>> So far there have been 0 responses to KIP-944. I understand this may
>>>>> not
>>>>>>>>> be something that keeps you busy, but this KIP is important to people
>>>>>>>>> that use async runtimes like Zio, Cats and Kotlin.
>>>>>>>>> 
>>>>>>>>> Is there anything you need to come to a decision?
>>>>>>>>> 
>>>>>>>>> Kind regards,
>>>>>>>>>        Erik.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Op 05-07-2023 om 11:38 schreef Erik van Oosten:
>>>>>>>>>> Hello all,
>>>>>>>>>> 
>>>>>>>>>> I'd like to call a vote on KIP-944 Support async runtimes in
>>>>> consumer.
>>>>>>>>>> It has has been 'under discussion' for 7 days now. 'Under
>>>>> discussion'
>>>>>>>>>> between quotes, because there were 0 comments so far. I hope the KIP
>>>>>>>>>> is clear!
>>>>>>>>>> 
>>>>>>>>>> KIP description:https://cwiki.apache.org/confluence/x/chw0Dw
>>>>>>>>>> 
>>>>>>>>>> Kind regards,
>>>>>>>>>>       Erik.
>>>>>>>>>> 
>>>>>>>>>> 
> -- 
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com
> 

Reply via email to