Re: Is the KafkaStreams#store() method thread-safe?

2023-12-26 Thread Sophie Blee-Goldman
Hey Kohei,

Good question -- I don't think there's exactly a short answer to this
seemingly simple question so bear with me for a second.

My understanding is that KafkaStreams#store is very much intended to be
thread-safe, and would have been back when it was first added a long time
ago, and the javadocs should probably be updated to reflect that.

That said, you are totally right that whatever the intention, it is
technically not completely thread safe anymore since the storeProviders map
can be mutated when threads are added or removed. Of course, as long as you
are not adding or removing StreamThreads in your application, it should be
effectively thread-safe (but note: this includes using the REPLACE_THREAD
option with the StreamsUncaughtExceptionHandler)

We should go ahead and fix this of course. I'm pretty sure we can just
change the HashMap to a ConcurrentHashMap and be done with it -- there's
already locking around the actual map modifications with the
"changeThreadCount" lock in KafkaStreams, so we just need to make sure we
don't accidentally hit a ConcurrentModificationException by accessing this
map while it's being modified.

Would you mind submitting a JIRA ticket
 for this bug you
found? And would you be interested in submitting a patch yourself?

Thanks!
Sophie

On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki  wrote:

> Hello, I have Kafka Streams questions related to thread safety.
>
> In my Kafka Streams application, I have 2 threads below:
>
> * Thread A: this creates a Topology object including state stores and
> everything and then eventually calls the constructor of the KafkaStreams
> class and the start() method.
>
> * Thread B: this has a reference to the KafkaStreams object the thread A
> created. This periodically calls KafkaStreams#store on the object, gets a
> ReadOnlyWindowStore instance and reads the data in the store for monitoring
> purposes.
>
> I'm wondering if what my app does is ok in terms of thread safeness. I'm
> not so worried about ReadOnlyWindowStore because the javadoc says:
> “Implementations should be thread-safe as concurrent reads and writes are
> expected.”
>
> But as for KafkaStreams#store, I'm not so sure if it is ok to call from
> separate threads. One thing which concerns me is that it touches a HashMap,
> which is not thread safe here
> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object.
> Given that, I'm not so sure if this is designed to be thread-safe.
>
> My questions here: is it ok to call KafkaStreams#store from a thread which
> is different from the one which instantiated the KafkaStreams object? Or
> would that be better to call the store() method in the same thread and
> share only the ReadOnlyWindowStore instance with other threads? If that was
> better, would the store object be able to capture the changes that would be
> made by rebalancing? Is the KafkaStream class designed to be thread-safe at
> all?
>
> Regards,
> Kohei
>


Re: where to capture a failed task's exception

2023-12-26 Thread Greg Harris
Hey Akash,

Thanks for the question! For a direct answer, no: throwing exceptions
from poll() is only one of many ways that a task can fail.

If you look at the AK source, every failure ultimately uses the
AbstractStatus.State.FAILED enum [1]. You can trace the usages of this
enum back to see all of the ways that a connector or task can fail.
The failure trace is also exposed in the REST API, which is a stable
public API you can depend on.

[1]: 
https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java#L27C13-L27C13

Happy to help,
Greg

On Tue, Dec 26, 2023 at 2:41 PM Akash Dhiman  wrote:
>
> Hello,
>
> I have a requirement where I need to detect failed tasks based on the
> specific errors and emit a metric only when it doesn't fail based on these
> specific errors (these include unknown_topic_or_partition, specific cases
> of ConfigException etc)
>
> I know about a similar metric accessible via Prometheus but that gives me
> count of failed task count for any reason.
>
> I was thinking that wrapping the poll method of the task in try catch block
> would be sufficient where i detect for error details in the catch block and
> emit metric when they don't match the ones i don't want the metric for)
>
> but I am unsure if this captures all possible scenarios for which a task
> might fail.
>
> is it guaranteed that all the exceptions/error for which a task might fail
> gets emitted via the poll method?


where to capture a failed task's exception

2023-12-26 Thread Akash Dhiman
Hello,

I have a requirement where I need to detect failed tasks based on the
specific errors and emit a metric only when it doesn't fail based on these
specific errors (these include unknown_topic_or_partition, specific cases
of ConfigException etc)

I know about a similar metric accessible via Prometheus but that gives me
count of failed task count for any reason.

I was thinking that wrapping the poll method of the task in try catch block
would be sufficient where i detect for error details in the catch block and
emit metric when they don't match the ones i don't want the metric for)

but I am unsure if this captures all possible scenarios for which a task
might fail.

is it guaranteed that all the exceptions/error for which a task might fail
gets emitted via the poll method?