Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-08 Thread Erik van Oosten

Hello Colin,

>> In KIP-944, the callback thread can only delegate to another thread 
after reading from and writing to a threadlocal variable, providing the 
barriers right there.


> I don't see any documentation that accessing thread local variables 
provides a total store or load barrier. Do you have such documentation? 
It seems like if this were the case, we could eliminate volatile 
variables from most of the code base.


Now I was imprecise. The thread-locals are only somewhat involved. In 
the KIP proposal the callback thread reads an access key from a 
thread-local variable. It then needs to pass that access key to another 
thread, which then can set it on its own thread-local variable. The act 
of passing a value from one thread to another implies that a memory 
barrier needs to be passed. However, this is all not so relevant since 
there is no need to pass the access key back when the other thread is done.


But now I think about it a bit more, the locking mechanism runs in a 
synchronized block. If I remember correctly this should be enough to 
pass read and write barriers.


>> In the current implementation the consumer is also invoked from 
random threads. If it works now, it should continue to work.

> I'm not sure what you're referring to. Can you expand on this?

Any invocation of the consumer (e.g. method poll) is not from a thread 
managed by the consumer. This is what I was assuming you meant with the 
term 'random thread'.


> Hmm, not sure what you mean by "cooperate with blocking code." If you 
have 10 green threads you're multiplexing on to one CPU thread, and that 
CPU thread gets blocked because of what one green thread is doing, the 
other 9 green threads are blocked too, right? I guess it's "just" a 
performance problem, but it still seems like it could be a serious one.


There are several ways to deal with this. All async runtimes I know 
(Akka, Zio, Cats-effects) support this by letting you mark a task as 
blocking. The runtime will then either schedule it to another 
thread-pool, or it will grow the thread-pool to accommodate. In any case 
'the other 9 green threads' will simply be scheduled to another real 
thread. In addition, some of these runtimes detect long running tasks 
and will reschedule waiting tasks to another thread. This is all a bit 
off topic though.


> I don't see why this has to be "inherently multi-threaded." Why can't 
we have the other threads report back what messages they've processed to 
the worker thread. Then it will be able to handle these callbacks 
without involving the other threads.


Please consider the context which is that we are running inside the 
callback of the rebalance listener. The only way to execute something 
and also have a timeout on it is to run the something on another thread.


Kind regards,
    Erik.


Op 08-07-2023 om 19:17 schreef Colin McCabe:

On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:

Hi Colin,

Thanks for your thoughts and taking the time to reply.

Let me take away your concerns. None of your worries are an issue with
the algorithm described in KIP-944. Here it goes:

  > It's not clear ot me that it's safe to access the Kafka consumer or

producer concurrently from different threads.

Concurrent access is /not/ a design goal of KIP-944. In fact, it goes
through great lengths to make sure that this cannot happen.

*The only design goal is to allow callbacks to call the consumer from
another thread.*

To make sure there are no more misunderstandings about this, I have
added this goal to the KIP.


Hi Erik,

Sorry, I spoke imprecisely. My concern is not concurrent access, but 
multithreaded access in general. Basically cache line visibility issues.


  > This is true even if the accesses happen at different times, because

modern CPUs require memory barriers to guarantee inter-thread visibilty
of loads and stores.

In KIP-944, the callback thread can only delegate to another thread
after reading from and writing to a threadlocal variable, providing the
barriers right there.


I don't see any documentation that accessing thread local variables provides a 
total store or load barrier. Do you have such documentation? It seems like if 
this were the case, we could eliminate volatile variables from most of the code 
base.


  > I know that there are at least a few locks in the consumer code now,

due to our need to send heartbeats from a worker thread. I don't think
those would be sufficient to protect a client that is making calls from
random threads.

In the current implementation the consumer is also invoked from random
threads. If it works now, it should continue to work.


I'm not sure what you're referring to. Can you expand on this?


  > There has been some discussion of moving to a more traditional model

where people make calls to the client and the clients passes the given
data to a single background worker thread. This would avoid a lot lof
the footguns of the current model and probably better 

[jira] [Created] (KAFKA-15168) Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-08 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15168:


 Summary: Handle overlapping remote log segments in 
RemoteLogMetadata cache
 Key: KAFKA-15168
 URL: https://issues.apache.org/jira/browse/KAFKA-15168
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


For a partition p0 and a given leader epoch, the remote log manager can upload 
duplicate segments due to leader change. RemoteLogMetadata cache should handle 
the duplicate segments which may affect the follower and consumer.

(eg)

L0 uploaded the segment PZyYVdsJQWeBAdBDPqkcVA at t0 for offset range 10 - 90
L1 uploads the segment L5Ufv71IToiZYKgsluzcyA at t1 for offset range 5 - 100

In the RemoteLogLeaderEpochState class, the {{offsetToId}} is a navigable map. 
It sorts the entries by start-offset which keeps the state as:
{code:java}
(5 - 100) -> L5Ufv71IToiZYKgsluzcyA (T1)
(10 - 90) -> PZyYVdsJQWeBAdBDPqkcVA (T0){code}

For a fetch request with fetch-offset as 92, the RemoteLogLeaderEpochState will 
return the segment PZyYVdsJQWeBAdBDPqkcVA instead of L5Ufv71IToiZYKgsluzcyA, 
which doesn't have the respective offset and throws error back to the caller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15167) Tiered Storage Test Harness Framework

2023-07-08 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15167:


 Summary: Tiered Storage Test Harness Framework
 Key: KAFKA-15167
 URL: https://issues.apache.org/jira/browse/KAFKA-15167
 Project: Kafka
  Issue Type: Sub-task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


Base class for integration tests exercising the tiered storage functionality in 
Kafka.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager

2023-07-08 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15166:


 Summary: Add deletePartition API to the RemoteStorageManager
 Key: KAFKA-15166
 URL: https://issues.apache.org/jira/browse/KAFKA-15166
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the 
individual log segments  Storage providers such as HDFS have support to delete 
a directory. Having an {{deletePartition}} API to delete the data at the 
partition level will enhance the topic deletion.

This task may require a KIP as it touches the user-facing APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1982

2023-07-08 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-15165) Handle Kafka client certificate failures without impacting brokers

2023-07-08 Thread Sandeep (Jira)
Sandeep created KAFKA-15165:
---

 Summary: Handle Kafka client certificate failures without 
impacting brokers
 Key: KAFKA-15165
 URL: https://issues.apache.org/jira/browse/KAFKA-15165
 Project: Kafka
  Issue Type: Improvement
  Components: core, security
Affects Versions: 2.8.1
 Environment: production
Reporter: Sandeep


Following situation is observed in production:

Consumer or Producer SSL Certificates have expired due to mis-management of 
extending the certs. When these clients to connect to either read or publish 
messages, they get authentication failures. These clients keep on retrying and 
this impacts broker CPUs utilisation, which impacts other healthy clients 
connected to brokers.

CPU increase observed from 35% to 85-90%. Clients which are healthy see a spike 
in publish and consumer latencies upwards to multiply seconds.

This kind of situation creates a denial of service kind of attack on Kafka 
cluster.

We must gracefully handle this, but either:

1) Not allowing clients to connect or retry or do exponential retries after it 
fails to authenticate using SSL certs

2) Broker side changes, where it can blacklist clients for certain duration, 
which can be overwritten after certs are renewed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-928: Making Kafka resilient to log directories becoming full

2023-07-08 Thread Colin McCabe
On Wed, Jun 7, 2023, at 07:07, Christo Lolov wrote:
> Hey Colin,
>
> I tried the following setup:
>
> * Create 3 EC2 machines.
> * EC2 machine named A acts as a KRaft Controller.
> * EC2 machine named B acts as a KRaft Broker. (The only configurations
> different to the default values: log.retention.ms=3,
> log.segment.bytes=1048576, log.retention.check.interval.ms=3,
> leader.imbalance.check.interval.seconds=30)
> * EC2 machine named C acts as a Producer.
> * I attached 1 GB EBS volume to the EC2 machine B (Broker) and 
> configured
> the log.dirs to point to it.
> * I filled 995 MB of that EBS volume using fallocate.
> * I created a topic with 6 partitions and a replication factor of 1.
> * From the Producer machine I used 
> `~/kafka/bin/kafka-producer-perf-test.sh
> --producer.config ~/kafka/config/client.properties --topic batman
> --record-size 524288 --throughput 5 --num-records 150`. The disk on EC2
> machine B filled up and the broker shut down. I stopped the producer.
> * I stopped the controller on EC2 machine A. I started the controller to
> both be a controller and a broker (I need this because I cannot 
> communicate
> directly with a controller -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum
> ).
> * I deleted the topic to which I had been writing by using 
> kafka-topics.sh .
> * I started the broker on EC2 machine B and it failed due to no space 
> left
> on disk during its recovery process. The topic was not deleted from the
> disk.
>
> As such, I am not convinced that KRaft addresses the problem of deleting
> topics on startup if there is no space left on the disk - is there
> something wrong with my setup that you disagree with? I think this will
> continue to be the case even when JBOD + KRaft is implemented.

Thank you for trying this. You're right that it doesn't work today, but it very 
easily could with no architecture changes.

We have the initial KRaft metadata load when log recovery starts. So if we 
wanted to, we could delete non-existent topics during log recovery. (Obviously 
we'd want to check both the topic ID and topic name, as always.)

This would be a good optimization in general. Spending time recovering a 
directory and then immediately deleting it during the initial metadata load is 
silly. I think nobody has bothered to optimize this yet since it's a bit of a 
rare case. But we very easily could.

I don't know if this would require a KIP or not. Arguably it's not user-visible 
behavior.

best,
Colin

>
> Let me know your thoughts!
>
> Best,
> Christo
>
> On Mon, 5 Jun 2023 at 11:03, Christo Lolov  wrote:
>
>> Hey Colin,
>>
>> Thanks for the review!
>>
>> I am also skeptical that much space can be reclaimed via compaction as
>> detailed in the limitations section of the KIP.
>>
>> In my head there are two ways to get out of the saturated state -
>> configure more aggressive retention and delete topics. I wasn't aware that
>> KRaft deletes topics marked for deletion on startup if the disks occupied
>> by those partitions are full - I will check it out, thank you for the
>> information! On the retention side, I believe there is still a benefit in
>> keeping the broker up and responsive - in my experience, people first try
>> to reduce the data they have and only when that also does not work they are
>> okay with sacrificing all of the data.
>>
>> Let me know your thoughts!
>>
>> Best,
>> Christo
>>
>> On Fri, 2 Jun 2023 at 20:09, Colin McCabe  wrote:
>>
>>> Hi Christo,
>>>
>>> We're not adding new stuff to ZK at this point (it's deprecated), so it
>>> would be good to drop that from the design.
>>>
>>> With regard to the "saturated" state: I'm skeptical that compaction could
>>> really move the needle much in terms of freeing up space -- in most
>>> workloads I've seen, it wouldn't. Compaction also requires free space to
>>> function as well.
>>>
>>> So the main benefit of the "satured" state seems to be enabling deletion
>>> on full disks. But KRaft mode already has most of that benefit. Full disks
>>> (or, indeed, downed brokers) don't block deletion on KRaft. If you delete a
>>> topic and then bounce the broker that had the disk full, it will delete the
>>> topic directory on startup as part of its snapshot load process.
>>>
>>> So I'm not sure if we really need this. Maybe we should re-evaluate once
>>> we have JBOD + KRaft.
>>>
>>> best,
>>> Colin
>>>
>>>
>>> On Mon, May 22, 2023, at 02:23, Christo Lolov wrote:
>>> > Hello all!
>>> >
>>> > I would like to start a discussion on KIP-928: Making Kafka resilient to
>>> > log directories becoming full which can be found at
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-928%3A+Making+Kafka+resilient+to+log+directories+becoming+full
>>> > .
>>> >
>>> > In summary, I frequently run into problems where Kafka becomes
>>> unresponsive
>>> > when the disks backing its log directories become full. Such
>>> > 

Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-08 Thread Colin McCabe
On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote:
> Hi Colin,
>
> Thanks for your thoughts and taking the time to reply.
>
> Let me take away your concerns. None of your worries are an issue with 
> the algorithm described in KIP-944. Here it goes:
>
>  > It's not clear ot me that it's safe to access the Kafka consumer or 
> > producer concurrently from different threads.
>
> Concurrent access is /not/ a design goal of KIP-944. In fact, it goes 
> through great lengths to make sure that this cannot happen.
>
> *The only design goal is to allow callbacks to call the consumer from 
> another thread.*
>
> To make sure there are no more misunderstandings about this, I have 
> added this goal to the KIP.
>

Hi Erik,

Sorry, I spoke imprecisely. My concern is not concurrent access, but 
multithreaded access in general. Basically cache line visibility issues.

>  > This is true even if the accesses happen at different times, because 
> > modern CPUs require memory barriers to guarantee inter-thread visibilty 
> > of loads and stores.
>
> In KIP-944, the callback thread can only delegate to another thread 
> after reading from and writing to a threadlocal variable, providing the 
> barriers right there.
>

I don't see any documentation that accessing thread local variables provides a 
total store or load barrier. Do you have such documentation? It seems like if 
this were the case, we could eliminate volatile variables from most of the code 
base.

>  > I know that there are at least a few locks in the consumer code now, 
> > due to our need to send heartbeats from a worker thread. I don't think 
> > those would be sufficient to protect a client that is making calls from 
> > random threads.
>
> In the current implementation the consumer is also invoked from random 
> threads. If it works now, it should continue to work.
>

I'm not sure what you're referring to. Can you expand on this?

>  > There has been some discussion of moving to a more traditional model 
> > where people make calls to the client and the clients passes the given 
> > data to a single background worker thread. This would avoid a lot lof 
> > the footguns of the current model and probably better reflect how people 
> > actually use the client.
>
> That is awesome. However, I'd rather not wait for that.
>
>  > Another issue is that neither the producer nor the consumer is fully 
> > nonblocking. There are some corner cases where we do in fact block. From 
> > memory, the producer blocks in some "buffer full" cases, and the 
> > consumer blocks sometimes when fetching metadata.
>
> I am aware of that. This is not an issue; all async runtimes can 
> cooperate with blocking code.
>

Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 
green threads you're multiplexing on to one CPU thread, and that CPU thread 
gets blocked because of what one green thread is doing, the other 9 green 
threads are blocked too, right? I guess it's "just" a performance problem, but 
it still seems like it could be a serious one.

>  > I suspect it would be more appropriate for Kotlin coroutines, Zio 
> > coroutines and so on to adopt this "pass messages to and from a 
> > background worker thread" model than to try to re-engineer the Kafka 
> > client ot work from random threads.
>
> In both zio-kafka and fs2-kafka this is already the approach we are taking.
>
> Unfortunately, the Kafka consumer forces us to perform some work in 
> callbacks:
>
>   * commit completed callback: register that the callback is complete,
>   * partition revoked callback: in this callback we need to submit
> commits from everything consumed and processed so far, using
> timeouts if processing takes to long. In an async runtime, this is
> an inherently multi-threaded process. Especially, we cannot do
> timeouts without involving multiple threads.
>

I don't see why this has to be "inherently multi-threaded." Why can't we have 
the other threads report back what messages they've processed to the worker 
thread. Then it will be able to handle these callbacks without involving the 
other threads.

regards,
Colin

> I have extended the KIP's motivation to explain the major use case.
>
> Please read KIP-944 again. Even though the description is extensive 
> (this callback from callback stuff is tricky), you will find that my 
> goals are modest.
>
> Also the implementation is just a few lines. With understanding of the 
> idea it should not be a lot of work to follow it.
>
> Kind regards,
>      Erik.
>
>
> Op 07-07-2023 om 19:57 schreef Colin McCabe:
>> Hi Erik,
>>
>> It's not clear ot me that it's safe to access the Kafka consumer or producer 
>> concurrently from different threads. There are data structures that aren't 
>> protected by locks, so I wouldn't necessarily expect accessing and mutating 
>> them in a concurrent way to work. This is true even if the accesses happen 
>> at different times, because modern CPUs require memory barriers to 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1981

2023-07-08 Thread Apache Jenkins Server
See 




Re: Apache Kafka 3.6.0 release

2023-07-08 Thread Satish Duggana
Hi Yash,
Thanks for the update. Added KIP-793 to the release plan. Please feel
free to update the release wiki with any other updates on the KIP.

~Satish.

On Fri, 7 Jul 2023 at 10:52, Yash Mayya  wrote:
>
> Hi Satish,
>
> KIP-793 [1] just passed voting and we should be able to wrap up the
> implementation in time for the 3.6.0 feature freeze. Could we add it to the
> release plan?
>
> [1] -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
>
> Thanks,
> Yash
>
> On Mon, Jun 12, 2023 at 3:52 PM Satish Duggana 
> wrote:
>
> > Hi,
> > I have created a release plan for Apache Kafka version 3.6.0 on the
> > wiki. You can access the release plan and all related information by
> > following this link:
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.6.0
> >
> > The release plan outlines the key milestones and important dates for
> > version 3.6.0. Currently, the following dates have been set for the
> > release:
> >
> > KIP Freeze: 26th July 23
> > Feature Freeze : 16th Aug 23
> > Code Freeze : 30th Aug 23
> >
> > Please review the release plan and provide any additional information
> > or updates regarding KIPs targeting version 3.6.0. If you have
> > authored any KIPs that are missing a status or if there are incorrect
> > status details, please make the necessary updates and inform me so
> > that I can keep the plan accurate and up to date.
> >
> > Thanks,
> > Satish.
> >
> > On Mon, 17 Apr 2023 at 21:17, Luke Chen  wrote:
> > >
> > > Thanks for volunteering!
> > >
> > > +1
> > >
> > > Luke
> > >
> > > On Mon, Apr 17, 2023 at 2:03 AM Ismael Juma  wrote:
> > >
> > > > Thanks for volunteering Satish. +1.
> > > >
> > > > Ismael
> > > >
> > > > On Sun, Apr 16, 2023 at 10:08 AM Satish Duggana <
> > satish.dugg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I would like to volunteer as release manager for the next release,
> > > > > which will be Apache Kafka 3.6.0.
> > > > >
> > > > > If there are no objections, I will start a release plan a week after
> > > > > 3.5.0 release(around early May).
> > > > >
> > > > > Thanks,
> > > > > Satish.
> > > > >
> > > >
> >


[jira] [Resolved] (KAFKA-15157) Print startup time for RemoteIndexCache

2023-07-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15157.
--
Resolution: Fixed

> Print startup time for RemoteIndexCache
> ---
>
> Key: KAFKA-15157
> URL: https://issues.apache.org/jira/browse/KAFKA-15157
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Major
> Fix For: 3.6.0
>
>
> When RemoteIndexCache starts up, it will try to re-build the in-memory cache 
> using the files already present on the disk in the remote-index-cache 
> directory. The process involves:
> 1. deleting existing files which are pending delete i.e. have a .delete suffix
> 2. read the cached index files, if present.
> 3. creating the indexes (this step will create a MMapped'buffer)
> 4. perform sanity check on the indexes
> 5. add to internal cache
> The steps 2-5 are bound by the maximum number of entries in the cache. But 
> step 1 could be arbitrary large.
> To debug a slow cache startup, we should add a info statement that prints the 
> time it took to initialize the cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-08 Thread Erik van Oosten

Hi Colin,

Thanks for your thoughts and taking the time to reply.

Let me take away your concerns. None of your worries are an issue with 
the algorithm described in KIP-944. Here it goes:


> It's not clear ot me that it's safe to access the Kafka consumer or 
producer concurrently from different threads.


Concurrent access is /not/ a design goal of KIP-944. In fact, it goes 
through great lengths to make sure that this cannot happen.


*The only design goal is to allow callbacks to call the consumer from 
another thread.*


To make sure there are no more misunderstandings about this, I have 
added this goal to the KIP.


> This is true even if the accesses happen at different times, because 
modern CPUs require memory barriers to guarantee inter-thread visibilty 
of loads and stores.


In KIP-944, the callback thread can only delegate to another thread 
after reading from and writing to a threadlocal variable, providing the 
barriers right there.


> I know that there are at least a few locks in the consumer code now, 
due to our need to send heartbeats from a worker thread. I don't think 
those would be sufficient to protect a client that is making calls from 
random threads.


In the current implementation the consumer is also invoked from random 
threads. If it works now, it should continue to work.


> There has been some discussion of moving to a more traditional model 
where people make calls to the client and the clients passes the given 
data to a single background worker thread. This would avoid a lot lof 
the footguns of the current model and probably better reflect how people 
actually use the client.


That is awesome. However, I'd rather not wait for that.

> Another issue is that neither the producer nor the consumer is fully 
nonblocking. There are some corner cases where we do in fact block. From 
memory, the producer blocks in some "buffer full" cases, and the 
consumer blocks sometimes when fetching metadata.


I am aware of that. This is not an issue; all async runtimes can 
cooperate with blocking code.


> I suspect it would be more appropriate for Kotlin coroutines, Zio 
coroutines and so on to adopt this "pass messages to and from a 
background worker thread" model than to try to re-engineer the Kafka 
client ot work from random threads.


In both zio-kafka and fs2-kafka this is already the approach we are taking.

Unfortunately, the Kafka consumer forces us to perform some work in 
callbacks:


 * commit completed callback: register that the callback is complete,
 * partition revoked callback: in this callback we need to submit
   commits from everything consumed and processed so far, using
   timeouts if processing takes to long. In an async runtime, this is
   an inherently multi-threaded process. Especially, we cannot do
   timeouts without involving multiple threads.

I have extended the KIP's motivation to explain the major use case.

Please read KIP-944 again. Even though the description is extensive 
(this callback from callback stuff is tricky), you will find that my 
goals are modest.


Also the implementation is just a few lines. With understanding of the 
idea it should not be a lot of work to follow it.


Kind regards,
    Erik.


Op 07-07-2023 om 19:57 schreef Colin McCabe:

Hi Erik,

It's not clear ot me that it's safe to access the Kafka consumer or producer 
concurrently from different threads. There are data structures that aren't 
protected by locks, so I wouldn't necessarily expect accessing and mutating 
them in a concurrent way to work. This is true even if the accesses happen at 
different times, because modern CPUs require memory barriers to guarantee 
inter-thread visibilty of loads and stores.

I am writing this is without doing a detailed dive into the code (I haven't 
been into the consumer / producer code in a bit.) Someone who has worked more 
on the consumer recently might be able to give specific examples of things that 
wouldn't work.

I know that there are at least a few locks in the consumer code now, due to our 
need to send heartbeats from a worker thread. I don't think those would be 
sufficient to protect a client that is making calls from random threads.

There has been some discussion of moving to a more traditional model where 
people make calls to the client and the clients passes the given data to a 
single background worker thread. This would avoid a lot lof the footguns of the 
current model and probably better reflect how people actually use the client.

Another issue is that neither the producer nor the consumer is fully nonblocking. There 
are some corner cases where we do in fact block. From memory, the producer blocks in some 
"buffer full" cases, and the consumer blocks sometimes when fetching metadata.

I suspect it would be more appropriate for Kotlin coroutines, Zio coroutines and so on to 
adopt this "pass messages to and from a background worker thread" model  than 
to try to re-engineer the Kafka client ot work from