Hi John,

Thank you for your feedback!

Please find my answers inline.

Best,
Bruno

On 26.08.20 17:49, John Roesler wrote:
Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.


This makes sense to me. I will rename the methods.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.


You are right, `close(Duration.ZERO)` would avoid the deadlock. However, `close(Duration.ZERO)` does not guarantee you that all resources get closed. Furthermore, it is error-prone to rely on users to use the correct overload. One possibility to avoid that users use the wrong overload is to check in `close()` if the calling thread is a stream thread and in that case to call `close(0)` instead of `close(Long.MAX_VALUE)`. Maybe, I could also find a solution for the resource issue. Let me think about it. In conclusion, I agree that a new method may not be needed.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.


We definitely cannot reuse the name of a crashed stream thread, immediately. What we could do is keep a list of previously used names (or secondhand names to take your hand metaphor up) that are free now and reuse them for new stream threads. If there are no second-hand names a counter is increased and a new name is created. The list of secondhand names would be bounded by the maximum number of stream threads that were running contemporaneously. I guess that would not be too complex to implement and would avoid the pathological case. IMO, it would be less annoying have a new stream thread's metrics monitored in a graph of a stream thread that previously crashed than to run into the pathological case that costs money.


4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all StreamThreads die or when the Global thread dies. We
have a proposal from Navinder (KIP-406) to allow the Global
thread to automatically come back to life after some errors,
but presumably others would still be fatal.

I guess your reasoning is that if the cause of the ERROR
state happens to be just from all the StreamThreads dying,
and if there's a way to replace them, then it should be
possible to recover from this ERROR state.

Maybe that makes sense, and we should just note that if the
Global thread is dead, it won't be possible to transition
out of ERROR state.


Here, I was torn between ERROR and RUNNING.
I guess the key question here is: What is the meaning of ERROR?
Is it a terminal state that signalizes a fatal error that cannot be recovered without a restart of the client? If yes, there should not be any transition from ERROR to any state. I think it makes sense to have such a terminal state. For this KIP, it means that crashing stream threads will never lead to the ERROR state, because with the new methods, users can always recover. I also think that we have enough other metrics that can be used to monitor the progress of a Kafka Streams client, e.g., lag and alive stream threads. Users do not need to rely on the ERROR state. So, I am convinced to keep the client in RUNNING, if no stream threads are alive.


5. Global thread

Should it be in the scope of this KIP to consider replacing
the GlobalStreamThread?


I would prefer to tackle that in a different KIP to make progress.

6. Restarting

It seems like, if I configure Streams to have eg. 1 thread
and then add more threads over its lifetime (maybe up to 8),
then I might be dismayed after a restart of the app to see
it went back to 1.

I guess it could just be my job as a user to update the
config to match when I add or remove threads. Maybe that's
the best place to land right now. It still might be a good
point to mention in the KIP (and the docs).


I agree that it is fine for now and I will add a few words in the KIP.


Thanks again!
-John


On Wed, 2020-08-26 at 16:31 +0200, Bruno Cadonna wrote:
Hi,

I would like to propose the following KIP to start and shut down stream
threads during execution as well as to shut down asynchronously a Kafka
Streams client from an uncaught exception handler.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients


Best,
Bruno

Reply via email to