Re: Is the KafkaStreams#store() method thread-safe?

2024-01-02 Thread Sophie Blee-Goldman
I'll give you two answers here: theoretical and practical.

First up, theoretically there is unfortunately no way to work around this
issue, even if you restrict your access to the KafkaStreams#store API to
the thread that created the KafkaStreams. This is because we currently have
write locking on this map, but no read locking, and the thread that created
the KafkaStreams object (and any user threads under your application's
control) would be a reader with unprotected access to the map, whereas the
"writers" here are internal StreamThreads that can modify this map at any
time. So in theory, you can hit a ConcurrentModificationException any time
a reader thread attempts to iterate that map while a REPLACE_THREAD event
causes a StreamThread to modify it in the background.

It's actually a bit worse than it seems, because the actual *access* to the
map isn't even confined to the KafkaStreams#store call -- all that does is
pass a view of the storeProvider map's values() along to the
WrappingStoreProvider class, which then iterates over the storeProviders on
each query to the store you got back from KafkaStreams#store (in other
words, the store you get from this API is technically a composite store
that wraps a collection of the literal StateStore instances used by the
application topology). All that is to say, you actually don't have any
thread safety issues with the KafkaStreams#store call itself, but might hit
this ConcurrentModificationException any time you try to use it.

Now for my second answer...despite all that, I think you should just go for
it, and use the API as if this were thread safe. It should be exceedingly
rare to run into this race condition, and you can protect yourself from it
completely by catching any ConcurrentModificationExceptions and retrying
the query. Even if you do get one, it should be transient and the store
query should succeed on the retry. Frankly, if it doesn't succeed on the
first retry then you have bigger problems since it probably indicates your
stream threads are failing continuously.

Hope this helps!

On Wed, Dec 27, 2023 at 2:52 PM Kohei Nozaki  wrote:

> Hi Sophie, thank you so much for sharing that. It all makes sense to me.
>
> Unfortunately my application uses REPLACE_THREAD, so it seems like I need
> a workaround for this until this thread unsafeness is removed. As I raised
> in my first email, would sharing only the ReadOnlyWindowStore instance with
> other threads be a workaround for this? Would the store object here be able
> to capture the changes that would be made by rebalancing?
>
> I've filed a ticket here (I'm interested in submitting a patch, but I
> cannot make any commitment):
> https://issues.apache.org/jira/browse/KAFKA-16055
>
> Regards,
> Kohei
>
>
> > On Dec 27, 2023, at 5:43, Sophie Blee-Goldman 
> wrote:
> >
> > Hey Kohei,
> >
> > Good question -- I don't think there's exactly a short answer to this
> > seemingly simple question so bear with me for a second.
> >
> > My understanding is that KafkaStreams#store is very much intended to be
> > thread-safe, and would have been back when it was first added a long time
> > ago, and the javadocs should probably be updated to reflect that.
> >
> > That said, you are totally right that whatever the intention, it is
> > technically not completely thread safe anymore since the storeProviders
> map
> > can be mutated when threads are added or removed. Of course, as long as
> you
> > are not adding or removing StreamThreads in your application, it should
> be
> > effectively thread-safe (but note: this includes using the REPLACE_THREAD
> > option with the StreamsUncaughtExceptionHandler)
> >
> > We should go ahead and fix this of course. I'm pretty sure we can just
> > change the HashMap to a ConcurrentHashMap and be done with it -- there's
> > already locking around the actual map modifications with the
> > "changeThreadCount" lock in KafkaStreams, so we just need to make sure we
> > don't accidentally hit a ConcurrentModificationException by accessing
> this
> > map while it's being modified.
> >
> > Would you mind submitting a JIRA ticket
> >  for this bug you
> > found? And would you be interested in submitting a patch yourself?
> >
> > Thanks!
> > Sophie
> >
> > On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki  wrote:
> >
> >> Hello, I have Kafka Streams questions related to thread safety.
> >>
> >> In my Kafka Streams application, I have 2 threads below:
> >>
> >> * Thread A: this creates a Topology object including state stores and
> >> everything and then eventually calls the constructor of the KafkaStreams
> >> class and the start() method.
> >>
> >> * Thread B: this has a reference to the KafkaStreams object the thread A
> >> created. This periodically calls KafkaStreams#store on the object, gets
> a
> >> ReadOnlyWindowStore instance and reads the data in the store for
> monitoring
> >> purposes.
> >>
> >> I'm wondering if what my app does is ok 

Re: Is the KafkaStreams#store() method thread-safe?

2023-12-31 Thread Martin Gainty
You need to wrap Hashmap with ConcurrentHashMap imlementation otherwise you 
will receive no Thread Notifications

Sent from my Verizon, Samsung Galaxy smartphone
Get Outlook for Android<https://aka.ms/AAb9ysg>

From: Kohei Nozaki 
Sent: Wednesday, December 27, 2023 5:52:02 PM
To: users@kafka.apache.org 
Subject: Re: Is the KafkaStreams#store() method thread-safe?

Hi Sophie, thank you so much for sharing that. It all makes sense to me.

Unfortunately my application uses REPLACE_THREAD, so it seems like I need a 
workaround for this until this thread unsafeness is removed. As I raised in my 
first email, would sharing only the ReadOnlyWindowStore instance with other 
threads be a workaround for this? Would the store object here be able to 
capture the changes that would be made by rebalancing?

I've filed a ticket here (I'm interested in submitting a patch, but I cannot 
make any commitment): https://issues.apache.org/jira/browse/KAFKA-16055

Regards,
Kohei


> On Dec 27, 2023, at 5:43, Sophie Blee-Goldman  wrote:
>
> Hey Kohei,
>
> Good question -- I don't think there's exactly a short answer to this
> seemingly simple question so bear with me for a second.
>
> My understanding is that KafkaStreams#store is very much intended to be
> thread-safe, and would have been back when it was first added a long time
> ago, and the javadocs should probably be updated to reflect that.
>
> That said, you are totally right that whatever the intention, it is
> technically not completely thread safe anymore since the storeProviders map
> can be mutated when threads are added or removed. Of course, as long as you
> are not adding or removing StreamThreads in your application, it should be
> effectively thread-safe (but note: this includes using the REPLACE_THREAD
> option with the StreamsUncaughtExceptionHandler)
>
> We should go ahead and fix this of course. I'm pretty sure we can just
> change the HashMap to a ConcurrentHashMap and be done with it -- there's
> already locking around the actual map modifications with the
> "changeThreadCount" lock in KafkaStreams, so we just need to make sure we
> don't accidentally hit a ConcurrentModificationException by accessing this
> map while it's being modified.
>
> Would you mind submitting a JIRA ticket
> <https://issues.apache.org/jira/projects/KAFKA/issues> for this bug you
> found? And would you be interested in submitting a patch yourself?
>
> Thanks!
> Sophie
>
> On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki  wrote:
>
>> Hello, I have Kafka Streams questions related to thread safety.
>>
>> In my Kafka Streams application, I have 2 threads below:
>>
>> * Thread A: this creates a Topology object including state stores and
>> everything and then eventually calls the constructor of the KafkaStreams
>> class and the start() method.
>>
>> * Thread B: this has a reference to the KafkaStreams object the thread A
>> created. This periodically calls KafkaStreams#store on the object, gets a
>> ReadOnlyWindowStore instance and reads the data in the store for monitoring
>> purposes.
>>
>> I'm wondering if what my app does is ok in terms of thread safeness. I'm
>> not so worried about ReadOnlyWindowStore because the javadoc says:
>> “Implementations should be thread-safe as concurrent reads and writes are
>> expected.”
>>
>> But as for KafkaStreams#store, I'm not so sure if it is ok to call from
>> separate threads. One thing which concerns me is that it touches a HashMap,
>> which is not thread safe here
>> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
>> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object.
>> Given that, I'm not so sure if this is designed to be thread-safe.
>>
>> My questions here: is it ok to call KafkaStreams#store from a thread which
>> is different from the one which instantiated the KafkaStreams object? Or
>> would that be better to call the store() method in the same thread and
>> share only the ReadOnlyWindowStore instance with other threads? If that was
>> better, would the store object be able to capture the changes that would be
>> made by rebalancing? Is the KafkaStream class designed to be thread-safe at
>> all?
>>
>> Regards,
>> Kohei


Re: Is the KafkaStreams#store() method thread-safe?

2023-12-27 Thread Kohei Nozaki
Hi Sophie, thank you so much for sharing that. It all makes sense to me.

Unfortunately my application uses REPLACE_THREAD, so it seems like I need a 
workaround for this until this thread unsafeness is removed. As I raised in my 
first email, would sharing only the ReadOnlyWindowStore instance with other 
threads be a workaround for this? Would the store object here be able to 
capture the changes that would be made by rebalancing?

I've filed a ticket here (I'm interested in submitting a patch, but I cannot 
make any commitment): https://issues.apache.org/jira/browse/KAFKA-16055

Regards,
Kohei


> On Dec 27, 2023, at 5:43, Sophie Blee-Goldman  wrote:
> 
> Hey Kohei,
> 
> Good question -- I don't think there's exactly a short answer to this
> seemingly simple question so bear with me for a second.
> 
> My understanding is that KafkaStreams#store is very much intended to be
> thread-safe, and would have been back when it was first added a long time
> ago, and the javadocs should probably be updated to reflect that.
> 
> That said, you are totally right that whatever the intention, it is
> technically not completely thread safe anymore since the storeProviders map
> can be mutated when threads are added or removed. Of course, as long as you
> are not adding or removing StreamThreads in your application, it should be
> effectively thread-safe (but note: this includes using the REPLACE_THREAD
> option with the StreamsUncaughtExceptionHandler)
> 
> We should go ahead and fix this of course. I'm pretty sure we can just
> change the HashMap to a ConcurrentHashMap and be done with it -- there's
> already locking around the actual map modifications with the
> "changeThreadCount" lock in KafkaStreams, so we just need to make sure we
> don't accidentally hit a ConcurrentModificationException by accessing this
> map while it's being modified.
> 
> Would you mind submitting a JIRA ticket
>  for this bug you
> found? And would you be interested in submitting a patch yourself?
> 
> Thanks!
> Sophie
> 
> On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki  wrote:
> 
>> Hello, I have Kafka Streams questions related to thread safety.
>> 
>> In my Kafka Streams application, I have 2 threads below:
>> 
>> * Thread A: this creates a Topology object including state stores and
>> everything and then eventually calls the constructor of the KafkaStreams
>> class and the start() method.
>> 
>> * Thread B: this has a reference to the KafkaStreams object the thread A
>> created. This periodically calls KafkaStreams#store on the object, gets a
>> ReadOnlyWindowStore instance and reads the data in the store for monitoring
>> purposes.
>> 
>> I'm wondering if what my app does is ok in terms of thread safeness. I'm
>> not so worried about ReadOnlyWindowStore because the javadoc says:
>> “Implementations should be thread-safe as concurrent reads and writes are
>> expected.”
>> 
>> But as for KafkaStreams#store, I'm not so sure if it is ok to call from
>> separate threads. One thing which concerns me is that it touches a HashMap,
>> which is not thread safe here
>> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
>> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object.
>> Given that, I'm not so sure if this is designed to be thread-safe.
>> 
>> My questions here: is it ok to call KafkaStreams#store from a thread which
>> is different from the one which instantiated the KafkaStreams object? Or
>> would that be better to call the store() method in the same thread and
>> share only the ReadOnlyWindowStore instance with other threads? If that was
>> better, would the store object be able to capture the changes that would be
>> made by rebalancing? Is the KafkaStream class designed to be thread-safe at
>> all?
>> 
>> Regards,
>> Kohei


Re: Is the KafkaStreams#store() method thread-safe?

2023-12-26 Thread Sophie Blee-Goldman
Hey Kohei,

Good question -- I don't think there's exactly a short answer to this
seemingly simple question so bear with me for a second.

My understanding is that KafkaStreams#store is very much intended to be
thread-safe, and would have been back when it was first added a long time
ago, and the javadocs should probably be updated to reflect that.

That said, you are totally right that whatever the intention, it is
technically not completely thread safe anymore since the storeProviders map
can be mutated when threads are added or removed. Of course, as long as you
are not adding or removing StreamThreads in your application, it should be
effectively thread-safe (but note: this includes using the REPLACE_THREAD
option with the StreamsUncaughtExceptionHandler)

We should go ahead and fix this of course. I'm pretty sure we can just
change the HashMap to a ConcurrentHashMap and be done with it -- there's
already locking around the actual map modifications with the
"changeThreadCount" lock in KafkaStreams, so we just need to make sure we
don't accidentally hit a ConcurrentModificationException by accessing this
map while it's being modified.

Would you mind submitting a JIRA ticket
 for this bug you
found? And would you be interested in submitting a patch yourself?

Thanks!
Sophie

On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki  wrote:

> Hello, I have Kafka Streams questions related to thread safety.
>
> In my Kafka Streams application, I have 2 threads below:
>
> * Thread A: this creates a Topology object including state stores and
> everything and then eventually calls the constructor of the KafkaStreams
> class and the start() method.
>
> * Thread B: this has a reference to the KafkaStreams object the thread A
> created. This periodically calls KafkaStreams#store on the object, gets a
> ReadOnlyWindowStore instance and reads the data in the store for monitoring
> purposes.
>
> I'm wondering if what my app does is ok in terms of thread safeness. I'm
> not so worried about ReadOnlyWindowStore because the javadoc says:
> “Implementations should be thread-safe as concurrent reads and writes are
> expected.”
>
> But as for KafkaStreams#store, I'm not so sure if it is ok to call from
> separate threads. One thing which concerns me is that it touches a HashMap,
> which is not thread safe here
> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object.
> Given that, I'm not so sure if this is designed to be thread-safe.
>
> My questions here: is it ok to call KafkaStreams#store from a thread which
> is different from the one which instantiated the KafkaStreams object? Or
> would that be better to call the store() method in the same thread and
> share only the ReadOnlyWindowStore instance with other threads? If that was
> better, would the store object be able to capture the changes that would be
> made by rebalancing? Is the KafkaStream class designed to be thread-safe at
> all?
>
> Regards,
> Kohei
>


Is the KafkaStreams#store() method thread-safe?

2023-12-22 Thread Kohei Nozaki
Hello, I have Kafka Streams questions related to thread safety.

In my Kafka Streams application, I have 2 threads below:

* Thread A: this creates a Topology object including state stores and 
everything and then eventually calls the constructor of the KafkaStreams class 
and the start() method.

* Thread B: this has a reference to the KafkaStreams object the thread A 
created. This periodically calls KafkaStreams#store on the object, gets a 
ReadOnlyWindowStore instance and reads the data in the store for monitoring 
purposes.

I'm wondering if what my app does is ok in terms of thread safeness. I'm not so 
worried about ReadOnlyWindowStore because the javadoc says: “Implementations 
should be thread-safe as concurrent reads and writes are expected.”

But as for KafkaStreams#store, I'm not so sure if it is ok to call from 
separate threads. One thing which concerns me is that it touches a HashMap, 
which is not thread safe here 
https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
 . A KafkaStreams#removeStreamThread() call can mutate this HashMap object. 
Given that, I'm not so sure if this is designed to be thread-safe.

My questions here: is it ok to call KafkaStreams#store from a thread which is 
different from the one which instantiated the KafkaStreams object? Or would 
that be better to call the store() method in the same thread and share only the 
ReadOnlyWindowStore instance with other threads? If that was better, would the 
store object be able to capture the changes that would be made by rebalancing? 
Is the KafkaStream class designed to be thread-safe at all?

Regards,
Kohei