The error code right now is the assignor error, 2 is coded for shutdown
but it could be expanded to encode the causes or for other errors that need
to be communicated. For example we can add error code 3 to close the thread
but leave the client in an error state if we choose to do so in the future.

On Mon, Sep 21, 2020 at 3:43 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thanks for the KIP Walker.
>
> In the KIP we mentioned "In order to communicate the shutdown request from
> one client to the others we propose to update the SubcriptionInfoData to
> include a short field which will encode an error code.", is there a
> dedicated error code that we should define here, or it is case-by-case?
>
> On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > I am changing the name to "Add method to Shutdown entire Streams
> > Application" since we are no longer using an Exception, it seems more
> > appropriate.
> >
> > Also it looks like the discussion is pretty much finished so I will be
> > calling it to a vote.
> >
> > thanks,
> > Walker
> >
> > On Sat, Sep 19, 2020 at 7:49 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Sounds good to me. I also feel that this call should be non-blocking
> but
> > I
> > > guess I was confused from the discussion thread that the API is
> designed
> > in
> > > a blocking fashion which contradicts with my perspective and hence I
> > asked
> > > for clarification :)
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Sep 16, 2020 at 12:32 PM Walker Carlson <wcarl...@confluent.io
> >
> > > wrote:
> > >
> > > > Hello Guozhang,
> > > >
> > > > As for the logging I plan on having three logs. First, the client log
> > > that
> > > > it is requesting an application shutdown, second, the leader log
> > > processId
> > > > of the invoker, third, then the StreamRebalanceListener it logs that
> it
> > > is
> > > > closing because of an `stream.appShutdown`. Hopefully this will be
> > enough
> > > > to make the cause of the close clear.
> > > >
> > > > I see what you mean about the name being dependent on the behavior of
> > the
> > > > method so I will try to clarify.  This is how I currently envision
> the
> > > call
> > > > working.
> > > >
> > > > It is not an option to directly initiate a shutdown through a
> > > StreamThread
> > > > object from a KafkaStreams object because "KafkaConsumer is not safe
> > for
> > > > multi-threaded access". So how it works is that the method in
> > > KafkaStreams
> > > > finds the first alive thread and sets a flag in the StreamThread. The
> > > > StreamThread will receive the flag in its runloop then set the error
> > code
> > > > and trigger a rebalance, afterwards it will stop processing. After
> the
> > > > KafkaStreams has set the flag it will return true and continue
> running.
> > > If
> > > > there are no alive threads the shutdown will fail and return false.
> > > >
> > > > What do you think the blocking behavior should be? I think that the
> > > > StreamThread should definitely stop to prevent any of the corruption
> we
> > > are
> > > > trying to avoid by shutting down, but I don't see any advantage of
> the
> > > > KafkaStreams call blocking.
> > > >
> > > > You are correct to be concerned about the uncaught exception handler.
> > If
> > > > there are no live StreamThreads the rebalance will not be started at
> > all
> > > > and this would be a problem. However the user should be aware of this
> > > > because of the return of false and react appropriately. This would
> also
> > > be
> > > > fixed if we implemented our own handler so we can rebalance before
> the
> > > > StreamThread closes.
> > > >
> > > > With that in mind I believe that `initiateClosingAllClients` would be
> > an
> > > > appropriate name. WDYT?
> > > >
> > > > Walker
> > > >
> > > >
> > > > On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Walker,
> > > > >
> > > > > Thanks for the updated KIP. Previously I'm also a bit hesitant on
> the
> > > > newly
> > > > > added public exception to communicate user-requested whole app
> > > shutdown,
> > > > > but the reason I did not bring this up is that I feel there's
> still a
> > > > need
> > > > > from operational aspects that we can differentiate the scenario
> where
> > > an
> > > > > instance is closed because of a) local `streams.close()` triggered,
> > or
> > > > b) a
> > > > > remote instance's `stream.shutdownApp` triggered. So if we are
> going
> > to
> > > > > remove that exception (which I'm also in favor), we should at least
> > > > > differentiate from the log4j levels.
> > > > >
> > > > > Regarding the semantics that "It should wait to receive the
> shutdown
> > > > > request in the rebalance it triggers." I'm not sure I fully
> > understand,
> > > > > since this may be triggered from the stream thread's uncaught
> > exception
> > > > > handler, if that thread is already dead then maybe a rebalance
> > listener
> > > > > would not even be fired at all. Although I know this is some
> > > > implementation
> > > > > details that you probably abstract away from the proposal, I'd like
> > to
> > > > make
> > > > > sure that we are on the same page regarding its blocking behavior
> > since
> > > > it
> > > > > is quite crucial to users as well. Could you elaborate a bit more?
> > > > >
> > > > > Regarding the function name, I guess my personal preference would
> > > depend
> > > > on
> > > > > its actual blocking behavior as above :)
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <
> > wcarl...@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hello all again,
> > > > > >
> > > > > > I have updated the kip to no longer use an exception and instead
> > add
> > > a
> > > > > > method to the KafkaStreams class, this seems to satisfy
> everyone's
> > > > > concerns
> > > > > > about how and when the functionality will be invoked.
> > > > > >
> > > > > > There is still a question over the name. We must decide between
> > > > > > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or
> > some
> > > > > > variation.
> > > > > >
> > > > > > I am rather indifferent to the name. I think that they all get
> the
> > > > point
> > > > > > across. The most clear to me would be shutdownApplicaiton or
> > > > > > closeAllInstacnes but WDYT?
> > > > > >
> > > > > > Walker
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <
> > > wcarl...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Guozhang and Bruno,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > I will respond in two parts but I would like to clarify that I
> am
> > > not
> > > > > > tied
> > > > > > > down to any of these names, but since we are still deciding if
> we
> > > > want
> > > > > to
> > > > > > > have an exception or not I would rather not get tripped up on
> > > > choosing
> > > > > a
> > > > > > > name just yet.
> > > > > > >
> > > > > > > Guozhang:
> > > > > > > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA
> > > error. I
> > > > > am
> > > > > > > not planning on changing the behavior of handling source topic
> > > > > deletion.
> > > > > > > That is being down in kip-662 by Bruno. He is enabling the user
> > to
> > > > > create
> > > > > > > their own handler and shutdownApplication is giving them the
> > option
> > > > to
> > > > > > > shutdown.
> > > > > > >
> > > > > > > 2) It seems that we will remove the Exception entirely so this
> > > won't
> > > > > > > matter (below)
> > > > > > >
> > > > > > > 3) It should wait to receive the shutdown request in the
> > rebalance
> > > it
> > > > > > > triggers. That might be a better name. I am torn between using
> > > > > > > "application" or "all Instances" in a couple places. I think we
> > > > should
> > > > > > pick
> > > > > > > one and be consistent but I am unsure which is more
> descriptive.
> > > > > > >
> > > > > > > Bruno:
> > > > > > > I agree that in principle Exceptions should be used in
> exception
> > > > cases.
> > > > > > > And I have added a method in KafkaStreams to handle cases where
> > an
> > > > > > > Exception would not be appropriate. I guess you think that
> users
> > > > should
> > > > > > > never throw a Streams Exception then they could always throw
> and
> > > > catch
> > > > > > > their own exception and call shutdown Application from there.
> > This
> > > > > would
> > > > > > > allow them to exit a processor if they wanted to shutdown from
> > > > there. I
> > > > > > > will update the Kip to remove the exception.
> > > > > > >
> > > > > > > I would like to add that in the case of trying to shutdown from
> > the
> > > > > > > uncaught exception handler that we need at least one
> StreamThread
> > > to
> > > > be
> > > > > > > alive. So having our own handler instead of using the default
> one
> > > > after
> > > > > > the
> > > > > > > thread has died would let us always close the application.
> > > > > > >
> > > > > > > Walker
> > > > > > >
> > > > > > > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <
> > br...@confluent.io>
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Walker,
> > > > > > >>
> > > > > > >> Thank you for the KIP!
> > > > > > >>
> > > > > > >> I like the motivation of the KIP and the method to request a
> > > > shutdown
> > > > > of
> > > > > > >> all Kafka Streams clients of Kafka Streams application. I
> think
> > we
> > > > > > >> really need such functionality to react on errors. However, I
> am
> > > not
> > > > > > >> convinced that throwing an exception to shutdown all clients
> is
> > a
> > > > good
> > > > > > >> idea.
> > > > > > >>
> > > > > > >> An exception signals an exceptional situation to which we can
> > > react
> > > > in
> > > > > > >> multiple ways depending on the context. The exception that you
> > > > propose
> > > > > > >> seems rather a well defined user command than a exceptional
> > > > situation
> > > > > to
> > > > > > >> me. IMO, we should not use exceptions to control program flow
> > > > because
> > > > > it
> > > > > > >> mixes cause and effect. Hence, I would propose an invariant
> for
> > > > public
> > > > > > >> exceptions in Kafka Streams. The public exceptions in Kafka
> > > Streams
> > > > > > >> should be caught by users, not thrown. But maybe I am missing
> > the
> > > > big
> > > > > > >> advantage of using an exception here.
> > > > > > >>
> > > > > > >> I echo Guozhang's third point about clarifying the behavior of
> > the
> > > > > > >> method and the naming.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Bruno
> > > > > > >>
> > > > > > >> On 16.09.20 06:28, Guozhang Wang wrote:
> > > > > > >> > Hello Walker,
> > > > > > >> >
> > > > > > >> > Thanks for proposing the KIP! I have a couple more comments:
> > > > > > >> >
> > > > > > >> > 1. ShutdownRequestedException: my understanding is that this
> > > > > exception
> > > > > > >> is
> > > > > > >> > only used if the application-shutdown was initiated by by
> the
> > > user
> > > > > > >> > triggered "shutdownApplication()", otherwise e.g. if it is
> due
> > > to
> > > > > > source
> > > > > > >> > topic not found and Streams library decides to close the
> whole
> > > > > > >> application
> > > > > > >> > automatically, we would still throw the original exception
> > > > > > >> > a.k.a. MissingSourceTopicException to the uncaught exception
> > > > > handling.
> > > > > > >> Is
> > > > > > >> > that the case? Also for this exception, which package are
> you
> > > > > > proposing
> > > > > > >> to
> > > > > > >> > add to?
> > > > > > >> >
> > > > > > >> > 2. ShutdownRequestedException: for its constructor, I'm
> > > wondering
> > > > > what
> > > > > > >> > Throwable "root cause" could it ever be? Since I'm guessing
> > here
> > > > > that
> > > > > > we
> > > > > > >> > would just use a single error code in the protocol still to
> > tell
> > > > > other
> > > > > > >> > instances to shutdown, and that error code would not allow
> us
> > to
> > > > > > encode
> > > > > > >> any
> > > > > > >> > more information like root causes at all, it seems that
> > > parameter
> > > > > > would
> > > > > > >> > always be null.
> > > > > > >> >
> > > > > > >> > 3. shutdownApplication: again I'd like to clarify, would
> this
> > > > > function
> > > > > > >> > block on the local instance to complete shutting down all
> its
> > > > > threads
> > > > > > >> like
> > > > > > >> > `close()` as well, or would it just to initiate the shutdown
> > and
> > > > not
> > > > > > >> wait
> > > > > > >> > for local threads at all? Also a nit suggestion regarding
> the
> > > > name,
> > > > > if
> > > > > > >> it
> > > > > > >> > is only for initiating the shutdown, maybe naming as
> > > > > > "initiateCloseAll"
> > > > > > >> > would be more specific?
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Guozhang
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> > > > > > wcarl...@confluent.io>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> >> Hello Matthias and Sophie,
> > > > > > >> >>
> > > > > > >> >> You both make good points. I will respond to the separately
> > > > below.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> Matthias:
> > > > > > >> >> That is a fair point. KIP-662
> > > > > > >> >> <
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> > > > > > >> >>> ,
> > > > > > >> >> which
> > > > > > >> >> is accepted, will make it so Source topic deletion will
> make
> > it
> > > > to
> > > > > > the
> > > > > > >> >> uncaught exception handler. Shutdown can be initiated from
> > > there.
> > > > > > >> However
> > > > > > >> >> this would mean that the stream thread is already dead. So
> I
> > > > would
> > > > > > >> have to
> > > > > > >> >> rethink the exception for this use case, perhaps it would
> be
> > > > needed
> > > > > > in
> > > > > > >> the
> > > > > > >> >> KakfaStreams object. But this still leaves the case where
> > there
> > > > is
> > > > > > >> only one
> > > > > > >> >> stream thread. I will think about it.
> > > > > > >> >>
> > > > > > >> >> Maybe the source topics are a bad example as it makes this
> > kip
> > > > > > >> dependent on
> > > > > > >> >> Kip-662 getting implemented in a certain way. However this
> is
> > > not
> > > > > the
> > > > > > >> only
> > > > > > >> >> reason this could be useful here
> > > > > > >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a
> jira
> > > > > ticket
> > > > > > >> asking
> > > > > > >> >> for the same functionality. I have added a few other use
> > cases
> > > to
> > > > > the
> > > > > > >> kip.
> > > > > > >> >> Although I will still be rethinking where I want to add
> this
> > > > > > >> functionality
> > > > > > >> >> and whether it should be an exception or not.
> > > > > > >> >>
> > > > > > >> >> Sophie:
> > > > > > >> >> I agree that shutting down an instance could also be
> useful.
> > > > There
> > > > > > was
> > > > > > >> some
> > > > > > >> >> discussion about this on KIP-663. It seems that we came to
> > the
> > > > > > >> conclusion
> > > > > > >> >> that close(Duration.ZERO) would be sufficient. link
> > > > > > >> >> <
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> > > > > > >> >>>
> > > > > > >> >> to
> > > > > > >> >> thread
> > > > > > >> >>
> > > > > > >> >> Also I am not set on the name ShutdownRequested. If we
> decide
> > > to
> > > > > keep
> > > > > > >> at as
> > > > > > >> >> an exception your idea is probably a better name.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> Thanks for the feedback,
> > > > > > >> >> Walker
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <
> > > > mj...@apache.org
> > > > > >
> > > > > > >> wrote:
> > > > > > >> >>
> > > > > > >> >>> Thanks for the KIP.
> > > > > > >> >>>
> > > > > > >> >>> It seem that the new exception would need to be thrown by
> > user
> > > > > code?
> > > > > > >> >>> However, in the motivation you mention the scenario of a
> > > missing
> > > > > > >> source
> > > > > > >> >>> topic that a user cannot detect, but KafkaStreams runtime
> > > would
> > > > be
> > > > > > >> >>> responsible to handle.
> > > > > > >> >>>
> > > > > > >> >>> How do both things go together?
> > > > > > >> >>>
> > > > > > >> >>>
> > > > > > >> >>> -Matthias
> > > > > > >> >>>
> > > > > > >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> > > > > > >> >>>> Hello all,
> > > > > > >> >>>>
> > > > > > >> >>>> I have created KIP-671 to give the option to shutdown a
> > > streams
> > > > > > >> >>>> application in response to an error.
> > > > > > >> >>>>
> > > > > > >> >>>
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > > > > >> >>>>
> > > > > > >> >>>> This is because of the Jira ticket
> > > > > > >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> > > > > > >> >>>>
> > > > > > >> >>>> Please give it a look and let me know if you have any
> > > feedback.
> > > > > > >> >>>>
> > > > > > >> >>>> Thanks,
> > > > > > >> >>>> Walker
> > > > > > >> >>>>
> > > > > > >> >>>
> > > > > > >> >>>
> > > > > > >> >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to