Hello Walker,

Thanks for the updated KIP. Previously I'm also a bit hesitant on the newly
added public exception to communicate user-requested whole app shutdown,
but the reason I did not bring this up is that I feel there's still a need
from operational aspects that we can differentiate the scenario where an
instance is closed because of a) local `streams.close()` triggered, or b) a
remote instance's `stream.shutdownApp` triggered. So if we are going to
remove that exception (which I'm also in favor), we should at least
differentiate from the log4j levels.

Regarding the semantics that "It should wait to receive the shutdown
request in the rebalance it triggers." I'm not sure I fully understand,
since this may be triggered from the stream thread's uncaught exception
handler, if that thread is already dead then maybe a rebalance listener
would not even be fired at all. Although I know this is some implementation
details that you probably abstract away from the proposal, I'd like to make
sure that we are on the same page regarding its blocking behavior since it
is quite crucial to users as well. Could you elaborate a bit more?

Regarding the function name, I guess my personal preference would depend on
its actual blocking behavior as above :)


Guozhang




On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson <wcarl...@confluent.io>
wrote:

> Hello all again,
>
> I have updated the kip to no longer use an exception and instead add a
> method to the KafkaStreams class, this seems to satisfy everyone's concerns
> about how and when the functionality will be invoked.
>
> There is still a question over the name. We must decide between
> "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
> variation.
>
> I am rather indifferent to the name. I think that they all get the point
> across. The most clear to me would be shutdownApplicaiton or
> closeAllInstacnes but WDYT?
>
> Walker
>
>
>
> On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > Hello Guozhang and Bruno,
> >
> > Thanks for the feedback.
> >
> > I will respond in two parts but I would like to clarify that I am not
> tied
> > down to any of these names, but since we are still deciding if we want to
> > have an exception or not I would rather not get tripped up on choosing a
> > name just yet.
> >
> > Guozhang:
> > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I am
> > not planning on changing the behavior of handling source topic deletion.
> > That is being down in kip-662 by Bruno. He is enabling the user to create
> > their own handler and shutdownApplication is giving them the option to
> > shutdown.
> >
> > 2) It seems that we will remove the Exception entirely so this won't
> > matter (below)
> >
> > 3) It should wait to receive the shutdown request in the rebalance it
> > triggers. That might be a better name. I am torn between using
> > "application" or "all Instances" in a couple places. I think we should
> pick
> > one and be consistent but I am unsure which is more descriptive.
> >
> > Bruno:
> > I agree that in principle Exceptions should be used in exception cases.
> > And I have added a method in KafkaStreams to handle cases where an
> > Exception would not be appropriate. I guess you think that users should
> > never throw a Streams Exception then they could always throw and catch
> > their own exception and call shutdown Application from there. This would
> > allow them to exit a processor if they wanted to shutdown from there. I
> > will update the Kip to remove the exception.
> >
> > I would like to add that in the case of trying to shutdown from the
> > uncaught exception handler that we need at least one StreamThread to be
> > alive. So having our own handler instead of using the default one after
> the
> > thread has died would let us always close the application.
> >
> > Walker
> >
> > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Walker,
> >>
> >> Thank you for the KIP!
> >>
> >> I like the motivation of the KIP and the method to request a shutdown of
> >> all Kafka Streams clients of Kafka Streams application. I think we
> >> really need such functionality to react on errors. However, I am not
> >> convinced that throwing an exception to shutdown all clients is a good
> >> idea.
> >>
> >> An exception signals an exceptional situation to which we can react in
> >> multiple ways depending on the context. The exception that you propose
> >> seems rather a well defined user command than a exceptional situation to
> >> me. IMO, we should not use exceptions to control program flow because it
> >> mixes cause and effect. Hence, I would propose an invariant for public
> >> exceptions in Kafka Streams. The public exceptions in Kafka Streams
> >> should be caught by users, not thrown. But maybe I am missing the big
> >> advantage of using an exception here.
> >>
> >> I echo Guozhang's third point about clarifying the behavior of the
> >> method and the naming.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 16.09.20 06:28, Guozhang Wang wrote:
> >> > Hello Walker,
> >> >
> >> > Thanks for proposing the KIP! I have a couple more comments:
> >> >
> >> > 1. ShutdownRequestedException: my understanding is that this exception
> >> is
> >> > only used if the application-shutdown was initiated by by the user
> >> > triggered "shutdownApplication()", otherwise e.g. if it is due to
> source
> >> > topic not found and Streams library decides to close the whole
> >> application
> >> > automatically, we would still throw the original exception
> >> > a.k.a. MissingSourceTopicException to the uncaught exception handling.
> >> Is
> >> > that the case? Also for this exception, which package are you
> proposing
> >> to
> >> > add to?
> >> >
> >> > 2. ShutdownRequestedException: for its constructor, I'm wondering what
> >> > Throwable "root cause" could it ever be? Since I'm guessing here that
> we
> >> > would just use a single error code in the protocol still to tell other
> >> > instances to shutdown, and that error code would not allow us to
> encode
> >> any
> >> > more information like root causes at all, it seems that parameter
> would
> >> > always be null.
> >> >
> >> > 3. shutdownApplication: again I'd like to clarify, would this function
> >> > block on the local instance to complete shutting down all its threads
> >> like
> >> > `close()` as well, or would it just to initiate the shutdown and not
> >> wait
> >> > for local threads at all? Also a nit suggestion regarding the name, if
> >> it
> >> > is only for initiating the shutdown, maybe naming as
> "initiateCloseAll"
> >> > would be more specific?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> >
> >> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson <
> wcarl...@confluent.io>
> >> > wrote:
> >> >
> >> >> Hello Matthias and Sophie,
> >> >>
> >> >> You both make good points. I will respond to the separately below.
> >> >>
> >> >>
> >> >> Matthias:
> >> >> That is a fair point. KIP-662
> >> >> <
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> >> >>> ,
> >> >> which
> >> >> is accepted, will make it so Source topic deletion will make it to
> the
> >> >> uncaught exception handler. Shutdown can be initiated from there.
> >> However
> >> >> this would mean that the stream thread is already dead. So I would
> >> have to
> >> >> rethink the exception for this use case, perhaps it would be needed
> in
> >> the
> >> >> KakfaStreams object. But this still leaves the case where there is
> >> only one
> >> >> stream thread. I will think about it.
> >> >>
> >> >> Maybe the source topics are a bad example as it makes this kip
> >> dependent on
> >> >> Kip-662 getting implemented in a certain way. However this is not the
> >> only
> >> >> reason this could be useful here
> >> >> <https://issues.apache.org/jira/browse/KAFKA-4748> is a jira ticket
> >> asking
> >> >> for the same functionality. I have added a few other use cases to the
> >> kip.
> >> >> Although I will still be rethinking where I want to add this
> >> functionality
> >> >> and whether it should be an exception or not.
> >> >>
> >> >> Sophie:
> >> >> I agree that shutting down an instance could also be useful. There
> was
> >> some
> >> >> discussion about this on KIP-663. It seems that we came to the
> >> conclusion
> >> >> that close(Duration.ZERO) would be sufficient. link
> >> >> <
> >> >>
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e
> >> >>>
> >> >> to
> >> >> thread
> >> >>
> >> >> Also I am not set on the name ShutdownRequested. If we decide to keep
> >> at as
> >> >> an exception your idea is probably a better name.
> >> >>
> >> >>
> >> >> Thanks for the feedback,
> >> >> Walker
> >> >>
> >> >>
> >> >> On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >> >>
> >> >>> Thanks for the KIP.
> >> >>>
> >> >>> It seem that the new exception would need to be thrown by user code?
> >> >>> However, in the motivation you mention the scenario of a missing
> >> source
> >> >>> topic that a user cannot detect, but KafkaStreams runtime would be
> >> >>> responsible to handle.
> >> >>>
> >> >>> How do both things go together?
> >> >>>
> >> >>>
> >> >>> -Matthias
> >> >>>
> >> >>> On 9/11/20 10:31 AM, Walker Carlson wrote:
> >> >>>> Hello all,
> >> >>>>
> >> >>>> I have created KIP-671 to give the option to shutdown a streams
> >> >>>> application in response to an error.
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> >> >>>>
> >> >>>> This is because of the Jira ticket
> >> >>>> <https://issues.apache.org/jira/browse/KAFKA-9331>
> >> >>>>
> >> >>>> Please give it a look and let me know if you have any feedback.
> >> >>>>
> >> >>>> Thanks,
> >> >>>> Walker
> >> >>>>
> >> >>>
> >> >>>
> >> >>
> >> >
> >> >
> >>
> >
>


-- 
-- Guozhang

Reply via email to