Thanks for the KIP Bruno.

While reading it, I had the same questions as raised by John and Walker
(so I won't repeat them).

In addition, I think that adding/removing threads should only be allowed
if the client state is RUNNING (but not in any other state, maybe except
ERROR). Furthermore, it seem that the methods must be `synchronized`
similar to `start()` and `close()`.

While I understand that current `close(Duration.ZERO)` is not the same
as `requestClose()`, I am wondering if we should change the semantics of
`close()` instead of adding a new method though?


Btw: for thread naming, I personally think that just using a counter (as
we do right now) might be ok. If this becomes an issue, we could improve
it later.


-Matthias

On 8/26/20 9:46 AM, Walker Carlson wrote:
> Hello Burno,
> 
> Thanks for the KIP!
> 
> Not to pile on, but I also had a couple additional questions. I am not
> super familiar with the StreamThread internals so please forgive any
> misconceptions if these are not relevant questions.
> 
> 1. In requestClose if a thread does not close properly and deadlocks for
> some reason how will we avoid the client waiting on the thread to close?
> like when you try to close the Kafka Streams client from a thread
> UncaughtExcetiopnHandler now.
> 
> The kip said it would improve the handling of these conditions, However I
> did not find it clear what strategy this improvement  would use. Maybe
> handling broken threads is out of the scope of this KIP or am I missing
> something?
> 
> 2a. Will the removal of Stream Threads in state DEAD be automatic? And will
> it be for all in that state or just for those closed with
> shutDownStreamThread?
> 
> 2b. From the wording it seems that removing DEAD threads form the Kafka
> Streams client will be a new feature of this kip. If that is the case is
> the reasonable possibility that keeping the dead threads in metadata might
> be useful? For example if a thread is continually erroring and restarting a
> replacement
> 
> 3. Maybe instead of addThread() we could use startNewThread()?
> I agree with John that startStreamThread could easily be misinterpreted
> i.e. as startStreamThreads.
> 
> Thanks,
> Walker
> 
> On Wed, Aug 26, 2020 at 8:48 AM John Roesler <vvcep...@apache.org> wrote:
> 
>> Hi Bruno,
>>
>> Thanks for the well motivated and throrough KIP!
>>
>> It's a good point that the record cache should be re-
>> distributed over the threads.
>>
>> Reading your KIP leads me to a few questions:
>>
>> 1. Start vs. Add
>>
>> Maybe this is paranoid, but I'm mildly concerned that users
>> who don't read the docs too carefully might think they need
>> to call "start thread" to start their application's threads
>> after calling `KafkaStreams.start` to start the app.
>> Definitely not sure about this, but I'm wondering if it
>> would be more clear to say `addThread` and
>> `remove/dropThread` to make it clear that we are adding to
>> or subtracting from the total number of threads, not just
>> starting and stopping threads that are already in the pool.
>>
>> 2. requestClose() vs. close(Duration.ZERO)
>>
>> It's a very good point about deadlocks. Can you explain why
>> we need a new method, though? The specified behavior seems
>> the same as `close(Duration.ZERO)`.
>>
>> 3. Thread Naming
>>
>> Maybe this point is silly, but how will newly added threads
>> be numbered? Will dead and hence removed threads' names be
>> reused? Or will there be a monotonic counter for the
>> lifetime of the instance?
>>
>> It seems relevant to mention this because it will affect
>> metrics and logs. I guess _maybe_ it would be nice for the
>> thread that replaces a crashed thread to take over its name,
>> but since the crashed thread still exists while the
>> UncaughtExceptionHandler is executing, its name wouldn't be
>> up for grabs in any case yet.
>>
>> On the other hand, it might be nicer for operators to be
>> able to distinguish the logs/metrics of the replacement
>> thread from the dead one, so not reusing thread names might
>> be better.
>>
>> On the other hand, not reusing thread names in a
>> "replacement" exception handler means that a crashy
>> application would report an unbounded number of thread ids
>> over its lifespan. This might be an issue for people using
>> popular metrics aggregation services that charge per unique
>> combination of metrics tags. Then again, maybe this is a
>> pathological case not worth considering.
>>
>> And yes, I realized I just implied that I have three hands.
>>
>> 4. ERROR State
>>
>> Can you elaborate why users explicitly stopping all threads
>> should put the application into ERROR state? It does seem
>> like it's not exactly "running" at that point, but it also
>> doesn't seem like an error.
>>
>> Right now, ERROR is a terminal state that indicates users
>> must discard the application instance and create a new one.
>> If there is still a possiblity that we'd need a terminally
>> corrupted state, it would probably be a mistake to add an
>> out-transition from it.
>>
>> The documentation on that state says that it happens when
>> all StreamThreads die or when the Global thread dies. We
>> have a proposal from Navinder (KIP-406) to allow the Global
>> thread to automatically come back to life after some errors,
>> but presumably others would still be fatal.
>>
>> I guess your reasoning is that if the cause of the ERROR
>> state happens to be just from all the StreamThreads dying,
>> and if there's a way to replace them, then it should be
>> possible to recover from this ERROR state.
>>
>> Maybe that makes sense, and we should just note that if the
>> Global thread is dead, it won't be possible to transition
>> out of ERROR state.
>>
>> 5. Global thread
>>
>> Should it be in the scope of this KIP to consider replacing
>> the GlobalStreamThread?
>>
>> 6. Restarting
>>
>> It seems like, if I configure Streams to have eg. 1 thread
>> and then add more threads over its lifetime (maybe up to 8),
>> then I might be dismayed after a restart of the app to see
>> it went back to 1.
>>
>> I guess it could just be my job as a user to update the
>> config to match when I add or remove threads. Maybe that's
>> the best place to land right now. It still might be a good
>> point to mention in the KIP (and the docs).
>>
>>
>> Thanks again!
>> -John
>>
>>
>> On Wed, 2020-08-26 at 16:31 +0200, Bruno Cadonna wrote:
>>> Hi,
>>>
>>> I would like to propose the following KIP to start and shut down stream
>>> threads during execution as well as to shut down asynchronously a Kafka
>>> Streams client from an uncaught exception handler.
>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>>>
>>>
>>> Best,
>>> Bruno
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to