Hello all,

Thanks for bringing this up, Bruno. It’s a really good point that a 
disconnected node would miss the signal and then resurrect a single-node 
“zombie cluster” when it reconnects.

Offhand, I can’t think of a simple and reliable way to distinguish this case 
from one in which an operator starts a node manually after a prior shutdown 
signal. Can you? Right now, I’m inclined to agree with Walker that we should 
leave this as a problem for the future. 

It should certainly be mentioned in the kip, and it also deserves special 
mention in our javadoc and html docs for this feature. 

Thanks!
John

On Wed, Sep 23, 2020, at 17:49, Walker Carlson wrote:
> Bruno,
> 
> I think that we can't guarantee that the message will get
> propagated perfectly in every case of, say network partitioning, though it
> will work for many cases. So I would say it's best effort and I will
> mention it in the kip.
> 
> As for when to use it I think we can discuss if this will be
> sufficient when we come to it, as long as we document its capabilities.
> 
> I hope this answers your question,
> 
> Walker
> 
> On Tue, Sep 22, 2020 at 12:33 AM Bruno Cadonna <br...@confluent.io> wrote:
> 
> > Walker,
> >
> > I am sorry, but I still have a comment on the KIP although you have
> > already started voting.
> >
> > What happens when a consumer of the group skips the rebalancing that
> > propagates the shutdown request? Do you give a guarantee that all Kafka
> > Streams clients are shutdown or is it best effort? If it is best effort,
> > I guess the proposed method might not be used in critical cases where
> > stopping record consumption may prevent or limit damage. I am not saying
> > that it must be a guarantee, but this question should be answered in the
> > KIP, IMO.
> >
> > Best,
> > Bruno
> >
> > On 22.09.20 01:14, Walker Carlson wrote:
> > > The error code right now is the assignor error, 2 is coded for shutdown
> > > but it could be expanded to encode the causes or for other errors that
> > need
> > > to be communicated. For example we can add error code 3 to close the
> > thread
> > > but leave the client in an error state if we choose to do so in the
> > future.
> > >
> > > On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <reluctanthero...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for the KIP Walker.
> > >>
> > >> In the KIP we mentioned "In order to communicate the shutdown request
> > from
> > >> one client to the others we propose to update the SubcriptionInfoData to
> > >> include a short field which will encode an error code.", is there a
> > >> dedicated error code that we should define here, or it is case-by-case?
> > >>
> > >> On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <wcarl...@confluent.io>
> > >> wrote:
> > >>
> > >>> I am changing the name to "Add method to Shutdown entire Streams
> > >>> Application" since we are no longer using an Exception, it seems more
> > >>> appropriate.
> > >>>
> > >>> Also it looks like the discussion is pretty much finished so I will be
> > >>> calling it to a vote.
> > >>>
> > >>> thanks,
> > >>> Walker
> > >>>
> > >>> On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Sounds good to me. I also feel that this call should be non-blocking
> > >> but
> > >>> I
> > >>>> guess I was confused from the discussion thread that the API is
> > >> designed
> > >>> in
> > >>>> a blocking fashion which contradicts with my perspective and hence I
> > >>> asked
> > >>>> for clarification :)
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <
> > wcarl...@confluent.io
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hello Guozhang,
> > >>>>>
> > >>>>> As for the logging I plan on having three logs. First, the client log
> > >>>> that
> > >>>>> it is requesting an application shutdown, second, the leader log
> > >>>> processId
> > >>>>> of the invoker, third, then the StreamRebalanceListener it logs that
> > >> it
> > >>>> is
> > >>>>> closing because of an `stream.appShutdown`. Hopefully this will be
> > >>> enough
> > >>>>> to make the cause of the close clear.
> > >>>>>
> > >>>>> I see what you mean about the name being dependent on the behavior of
> > >>> the
> > >>>>> method so I will try to clarify.  This is how I currently envision
> > >> the
> > >>>> call
> > >>>>> working.
> > >>>>>
> > >>>>> It is not an option to directly initiate a shutdown through a
> > >>>> StreamThread
> > >>>>> object from a KafkaStreams object because "KafkaConsumer is not safe
> > >>> for
> > >>>>> multi-threaded access". So how it works is that the method in
> > >>>> KafkaStreams
> > >>>>> finds the first alive thread and sets a flag in the StreamThread. The
> > >>>>> StreamThread will receive the flag in its runloop then set the error
> > >>> code
> > >>>>> and trigger a rebalance, afterwards it will stop processing. After
> > >> the
> > >>>>> KafkaStreams has set the flag it will return true and continue
> > >> running.
> > >>>> If
> > >>>>> there are no alive threads the shutdown will fail and return false.
> > >>>>>
> > >>>>> What do you think the blocking behavior should be? I think that the
> > >>>>> StreamThread should definitely stop to prevent any of the corruption
> > >> we
> > >>>> are
> > >>>>> trying to avoid by shutting down, but I don't see any advantage of
> > >> the
> > >>>>> KafkaStreams call blocking.
> > >>>>>
> > >>>>> You are correct to be concerned about the uncaught exception handler.
> > >>> If
> > >>>>> there are no live StreamThreads the rebalance will not be started at
> > >>> all
> > >>>>> and this would be a problem. However the user should be aware of this
> > >>>>> because of the return of false and react appropriately. This would
> > >> also
> > >>>> be
> > >>>>> fixed if we implemented our own handler so we can rebalance before
> > >> the
> > >>>>> StreamThread closes.
> > >>>>>
> > >>>>> With that in mind I believe that `initiateClosingAllClients` would be
> > >>> an
> > >>>>> appropriate name. WDYT?
> > >>>>>
> > >>>>> Walker
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hello Walker,
> > >>>>>>
> > >>>>>> Thanks for the updated KIP. Previously I'm also a bit hesitant on
> > >> the
> > >>>>> newly
> > >>>>>> added public exception to communicate user-requested whole app
> > >>>> shutdown,
> > >>>>>> but the reason I did not bring this up is that I feel there's
> > >> still a
> > >>>>> need
> > >>>>>> from operational aspects that we can differentiate the scenario
> > >> where
> > >>>> an
> > >>>>>> instance is closed because of a) local `streams.close()` triggered,
> > >>> or
> > >>>>> b) a
> > >>>>>> remote instance's `stream.shutdownApp` triggered. So if we are
> > >> going
> > >>> to
> > >>>>>> remove that exception (which I'm also in favor), we should at least
> > >>>>>> differentiate from the log4j levels.
> > >>>>>>
> > >>>>>> Regarding the semantics that "It should wait to receive the
> > >> shutdown
> > >>>>>> request in the rebalance it triggers." I'm not sure I fully
> > >>> understand,
> > >>>>>> since this may be triggered from the stream thread's uncaught
> > >>> exception
> > >>>>>> handler, if that thread is already dead then maybe a rebalance
> > >>> listener
> > >>>>>> would not even be fired at all. Although I know this is some
> > >>>>> implementation
> > >>>>>> details that you probably abstract away from the proposal, I'd like
> > >>> to
> > >>>>> make
> > >>>>>> sure that we are on the same page regarding its blocking behavior
> > >>> since
> > >>>>> it
> > >>>>>> is quite crucial to users as well. Could you elaborate a bit more?
> > >>>>>>
> > >>>>>> Regarding the function name, I guess my personal preference would
> > >>>> depend
> > >>>>> on
> > >>>>>> its actual blocking behavior as above :)
> > >>>>>>
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <
> > >>> wcarl...@confluent.io
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hello all again,
> > >>>>>>>
> > >>>>>>> I have updated the kip to no longer use an exception and instead
> > >>> add
> > >>>> a
> > >>>>>>> method to the KafkaStreams class, this seems to satisfy
> > >> everyone's
> > >>>>>> concerns
> > >>>>>>> about how and when the functionality will be invoked.
> > >>>>>>>
> > >>>>>>> There is still a question over the name. We must decide between
> > >>>>>>> "shutdownApplication", "initateCloseAll", "closeAllInstaces" or
> > >>> some
> > >>>>>>> variation.
> > >>>>>>>
> > >>>>>>> I am rather indifferent to the name. I think that they all get
> > >> the
> > >>>>> point
> > >>>>>>> across. The most clear to me would be shutdownApplicaiton or
> > >>>>>>> closeAllInstacnes but WDYT?
> > >>>>>>>
> > >>>>>>> Walker
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
> > >>>> wcarl...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hello Guozhang and Bruno,
> > >>>>>>>>
> > >>>>>>>> Thanks for the feedback.
> > >>>>>>>>
> > >>>>>>>> I will respond in two parts but I would like to clarify that I
> > >> am
> > >>>> not
> > >>>>>>> tied
> > >>>>>>>> down to any of these names, but since we are still deciding if
> > >> we
> > >>>>> want
> > >>>>>> to
> > >>>>>>>> have an exception or not I would rather not get tripped up on
> > >>>>> choosing
> > >>>>>> a
> > >>>>>>>> name just yet.
> > >>>>>>>>
> > >>>>>>>> Guozhang:
> > >>>>>>>> 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA
> > >>>> error. I
> > >>>>>> am
> > >>>>>>>> not planning on changing the behavior of handling source topic
> > >>>>>> deletion.
> > >>>>>>>> That is being down in kip-662 by Bruno. He is enabling the user
> > >>> to
> > >>>>>> create
> > >>>>>>>> their own handler and shutdownApplication is giving them the
> > >>> option
> > >>>>> to
> > >>>>>>>> shutdown.
> > >>>>>>>>
> > >>>>>>>> 2) It seems that we will remove the Exception entirely so this
> > >>>> won't
> > >>>>>>>> matter (below)
> > >>>>>>>>
> > >>>>>>>> 3) It should wait to receive the shutdown request in the
> > >>> rebalance
> > >>>> it
> > >>>>>>>> triggers. That might be a better name. I am torn between using
> > >>>>>>>> "application" or "all Instances" in a couple places. I think we
> > >>>>> should
> > >>>>>>> pick
> > >>>>>>>> one and be consistent but I am unsure which is more
> > >> descriptive.
> > >>>>>>>>
> > >>>>>>>> Bruno:
> > >>>>>>>> I agree that in principle Exceptions should be used in
> > >> exception
> > >>>>> cases.
> > >>>>>>>> And I have added a method in KafkaStreams to handle cases where
> > >>> an
> > >>>>>>>> Exception would not be appropriate. I guess you think that
> > >> users
> > >>>>> should
> > >>>>>>>> never throw a Streams Exception then they could always throw
> > >> and
> > >>>>> catch
> > >>>>>>>> their own exception and call shutdown Application from there.
> > >>> This
> > >>>>>> would
> > >>>>>>>> allow them to exit a processor if they wanted to shutdown from
> > >>>>> there. I
> > >>>>>>>> will update the Kip to remove the exception.
> > >>>>>>>>
> > >>>>>>>> I would like to add that in the case of trying to shutdown from
> > >>> the
> > >>>>>>>> uncaught exception handler that we need at least one
> > >> StreamThread
> > >>>> to
> > >>>>> be
> > >>>>>>>> alive. So having our own handler instead of using the default
> > >> one
> > >>>>> after
> > >>>>>>> the
> > >>>>>>>> thread has died would let us always close the application.
> > >>>>>>>>
> > >>>>>>>> Walker
> > >>>>>>>>
> > >>>>>>>> On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <
> > >>> br...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Walker,
> > >>>>>>>>>
> > >>>>>>>>> Thank you for the KIP!
> > >>>>>>>>>
> > >>>>>>>>> I like the motivation of the KIP and the method to request a
> > >>>>> shutdown
> > >>>>>> of
> > >>>>>>>>> all Kafka Streams clients of Kafka Streams application. I
> > >> think
> > >>> we
> > >>>>>>>>> really need such functionality to react on errors. However, I
> > >> am
> > >>>> not
> > >>>>>>>>> convinced that throwing an exception to shutdown all clients
> > >> is
> > >>> a
> > >>>>> good
> > >>>>>>>>> idea.
> > >>>>>>>>>
> > >>>>>>>>> An exception signals an exceptional situation to which we can
> > >>>> react
> > >>>>> in
> > >>>>>>>>> multiple ways depending on the context. The exception that you
> > >>>>> propose
> > >>>>>>>>> seems rather a well defined user command than a exceptional
> > >>>>> situation
> > >>>>>> to
> > >>>>>>>>> me. IMO, we should not use exceptions to control program flow
> > >>>>> because
> > >>>>>> it
> > >>>>>>>>> mixes cause and effect. Hence, I would propose an invariant
> > >> for
> > >>>>> public
> > >>>>>>>>> exceptions in Kafka Streams. The public exceptions in Kafka
> > >>>> Streams
> > >>>>>>>>> should be caught by users, not thrown. But maybe I am missing
> > >>> the
> > >>>>> big
> > >>>>>>>>> advantage of using an exception here.
> > >>>>>>>>>
> > >>>>>>>>> I echo Guozhang's third point about clarifying the behavior of
> > >>> the
> > >>>>>>>>> method and the naming.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Bruno
> > >>>>>>>>>
> > >>>>>>>>> On 16.09.20 06:28, Guozhang Wang wrote:
> > >>>>>>>>>> Hello Walker,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for proposing the KIP! I have a couple more comments:
> > >>>>>>>>>>
> > >>>>>>>>>> 1. ShutdownRequestedException: my understanding is that this
> > >>>>>> exception
> > >>>>>>>>> is
> > >>>>>>>>>> only used if the application-shutdown was initiated by by
> > >> the
> > >>>> user
> > >>>>>>>>>> triggered "shutdownApplication()", otherwise e.g. if it is
> > >> due
> > >>>> to
> > >>>>>>> source
> > >>>>>>>>>> topic not found and Streams library decides to close the
> > >> whole
> > >>>>>>>>> application
> > >>>>>>>>>> automatically, we would still throw the original exception
> > >>>>>>>>>> a.k.a. MissingSourceTopicException to the uncaught exception
> > >>>>>> handling.
> > >>>>>>>>> Is
> > >>>>>>>>>> that the case? Also for this exception, which package are
> > >> you
> > >>>>>>> proposing
> > >>>>>>>>> to
> > >>>>>>>>>> add to?
> > >>>>>>>>>>
> > >>>>>>>>>> 2. ShutdownRequestedException: for its constructor, I'm
> > >>>> wondering
> > >>>>>> what
> > >>>>>>>>>> Throwable "root cause" could it ever be? Since I'm guessing
> > >>> here
> > >>>>>> that
> > >>>>>>> we
> > >>>>>>>>>> would just use a single error code in the protocol still to
> > >>> tell
> > >>>>>> other
> > >>>>>>>>>> instances to shutdown, and that error code would not allow
> > >> us
> > >>> to
> > >>>>>>> encode
> > >>>>>>>>> any
> > >>>>>>>>>> more information like root causes at all, it seems that
> > >>>> parameter
> > >>>>>>> would
> > >>>>>>>>>> always be null.
> > >>>>>>>>>>
> > >>>>>>>>>> 3. shutdownApplication: again I'd like to clarify, would
> > >> this
> > >>>>>> function
> > >>>>>>>>>> block on the local instance to complete shutting down all
> > >> its
> > >>>>>> threads
> > >>>>>>>>> like
> > >>>>>>>>>> `close()` as well, or would it just to initiate the shutdown
> > >>> and
> > >>>>> not
> > >>>>>>>>> wait
> > >>>>>>>>>> for local threads at all? Also a nit suggestion regarding
> > >> the
> > >>>>> name,
> > >>>>>> if
> > >>>>>>>>> it
> > >>>>>>>>>> is only for initiating the shutdown, maybe naming as
> > >>>>>>> "initiateCloseAll"
> > >>>>>>>>>> would be more specific?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Guozhang
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> > >>>>>>> wcarl...@confluent.io>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hello Matthias and Sophie,
> > >>>>>>>>>>>
> > >>>>>>>>>>> You both make good points. I will respond to the separately
> > >>>>> below.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Matthias:
> > >>>>>>>>>>> That is a fair point. KIP-662
> > >>>>>>>>>>> <
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > >>>>>>>>>>>> ,
> > >>>>>>>>>>> which
> > >>>>>>>>>>> is accepted, will make it so Source topic deletion will
> > >> make
> > >>> it
> > >>>>> to
> > >>>>>>> the
> > >>>>>>>>>>> uncaught exception handler. Shutdown can be initiated from
> > >>>> there.
> > >>>>>>>>> However
> > >>>>>>>>>>> this would mean that the stream thread is already dead. So
> > >> I
> > >>>>> would
> > >>>>>>>>> have to
> > >>>>>>>>>>> rethink the exception for this use case, perhaps it would
> > >> be
> > >>>>> needed
> > >>>>>>> in
> > >>>>>>>>> the
> > >>>>>>>>>>> KakfaStreams object. But this still leaves the case where
> > >>> there
> > >>>>> is
> > >>>>>>>>> only one
> > >>>>>>>>>>> stream thread. I will think about it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Maybe the source topics are a bad example as it makes this
> > >>> kip
> > >>>>>>>>> dependent on
> > >>>>>>>>>>> Kip-662 getting implemented in a certain way. However this
> > >> is
> > >>>> not
> > >>>>>> the
> > >>>>>>>>> only
> > >>>>>>>>>>> reason this could be useful here
> > >>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-4748> is a
> > >> jira
> > >>>>>> ticket
> > >>>>>>>>> asking
> > >>>>>>>>>>> for the same functionality. I have added a few other use
> > >>> cases
> > >>>> to
> > >>>>>> the
> > >>>>>>>>> kip.
> > >>>>>>>>>>> Although I will still be rethinking where I want to add
> > >> this
> > >>>>>>>>> functionality
> > >>>>>>>>>>> and whether it should be an exception or not.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Sophie:
> > >>>>>>>>>>> I agree that shutting down an instance could also be
> > >> useful.
> > >>>>> There
> > >>>>>>> was
> > >>>>>>>>> some
> > >>>>>>>>>>> discussion about this on KIP-663. It seems that we came to
> > >>> the
> > >>>>>>>>> conclusion
> > >>>>>>>>>>> that close(Duration.ZERO) would be sufficient. link
> > >>>>>>>>>>> <
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> > >>>>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>>>>>>>>> thread
> > >>>>>>>>>>>
> > >>>>>>>>>>> Also I am not set on the name ShutdownRequested. If we
> > >> decide
> > >>>> to
> > >>>>>> keep
> > >>>>>>>>> at as
> > >>>>>>>>>>> an exception your idea is probably a better name.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the feedback,
> > >>>>>>>>>>> Walker
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
> > >>>>> mj...@apache.org
> > >>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It seem that the new exception would need to be thrown by
> > >>> user
> > >>>>>> code?
> > >>>>>>>>>>>> However, in the motivation you mention the scenario of a
> > >>>> missing
> > >>>>>>>>> source
> > >>>>>>>>>>>> topic that a user cannot detect, but KafkaStreams runtime
> > >>>> would
> > >>>>> be
> > >>>>>>>>>>>> responsible to handle.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> How do both things go together?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> > >>>>>>>>>>>>> Hello all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have created KIP-671 to give the option to shutdown a
> > >>>> streams
> > >>>>>>>>>>>>> application in response to an error.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> This is because of the Jira ticket
> > >>>>>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Please give it a look and let me know if you have any
> > >>>> feedback.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>> Walker
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to