Hi Jim,

Thanks for the KIP. Overall LGTM!

One late question:
Could we run the stream resetter tool (i.e.
kafka-streams-application-reset.sh) during pause state?
I can imagine there's a use case that after pausing for a while, user just
want to continue with the latest offset, and skipping the intermediate
records.

Thank you.
Luke

On Wed, May 11, 2022 at 10:12 AM Jim Hughes <jhug...@confluent.io.invalid>
wrote:

> Hi Matthias,
>
> I like it.  I've updated the KIP to reflect that detail; I put the details
> in the docs for pause.
>
> Cheers,
>
> Jim
>
> On Tue, May 10, 2022 at 7:51 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks for the KIP. Overall LGTM.
> >
> > Can we clarify one question: would it be allowed to call `pause()`
> > before calling `start()`? I don't see any reason why we would need to
> > disallow it?
> >
> > It could be helpful to start a KafkaStreams client in paused state --
> > otherwise there is a race between calling `start()` and calling
> `pause()`.
> >
> > If we allow it, we should clearly document it.
> >
> >
> > -Matthias
> >
> > On 5/10/22 12:04 PM, Jim Hughes wrote:
> > > Hi Bill, all,
> > >
> > > Thank you.  I've updated the KIP to reflect pausing standby tasks as
> > well.
> > > I think all the outstanding points have been addressed and I'm going to
> > > start the vote thread!
> > >
> > > Cheers,
> > >
> > > Jim
> > >
> > >
> > >
> > > On Tue, May 10, 2022 at 2:43 PM Bill Bejeck <bbej...@gmail.com> wrote:
> > >
> > >> Hi Jim,
> > >>
> > >> After reading the comments on the KIP, I agree that it makes sense to
> > pause
> > >> all activities and any changes can be made later on.
> > >>
> > >> Thanks,
> > >> Bill
> > >>
> > >> On Tue, May 10, 2022 at 4:03 AM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >>
> > >>> Hi Jim,
> > >>>
> > >>> Thanks for the KIP!
> > >>>
> > >>> I am fine with the KIP in general.
> > >>>
> > >>> However, I am with Sophie and John to also pause the standbys for the
> > >>> reasons they brought up. Is there a specific reason you want to keep
> > >>> standbys going? It feels like premature optimization to me. We can
> > still
> > >>> add keeping standby running in a follow up if needed.
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>> On 10.05.22 05:15, Sophie Blee-Goldman wrote:
> > >>>> Thanks Jim, just one note/question on the standby tasks:
> > >>>>
> > >>>> At the minute, my moderately held position is that standby tasks
> ought
> > >> to
> > >>>>> continue reading and remain caught up.  If standby tasks would run
> > out
> > >>> of
> > >>>>> space, there are probably bigger problems.
> > >>>>
> > >>>>
> > >>>> For a single node application, or when the #pause API is invoked on
> > all
> > >>>> instances,
> > >>>> then there won't be any further active processing and thus nothing
> to
> > >>> keep
> > >>>> up with,
> > >>>> right? So for that case, it's just a matter of whether any standbys
> > >> that
> > >>>> are lagging
> > >>>> will have the chance to catch up to the (paused) active task state
> > >> before
> > >>>> they stop
> > >>>> as well, in which case having them continue feels fine to me.
> However
> > >>> this
> > >>>> is a
> > >>>> relatively trivial benefit and I would only consider it as a
> deciding
> > >>>> factor when all
> > >>>> things are equal otherwise.
> > >>>>
> > >>>> My concern is the more interesting case: when this feature is used
> to
> > >>> pause
> > >>>> only
> > >>>> one nodes, or some subset of the overall application. In this case,
> > >> yes,
> > >>>> the standby
> > >>>> tasks will indeed fall out of sync. But the only reason I can
> imagine
> > >>>> someone using
> > >>>> the pause feature in such a way is because there is something going
> > >>> wrong,
> > >>>> or about
> > >>>> to go wrong, on that particular node. For example as mentioned
> above,
> > >> if
> > >>>> the user
> > >>>> wants to cut down on costs without stopping everything, or if the
> node
> > >> is
> > >>>> about to
> > >>>> run out of disk or needs to be debugged or so on. And in this case,
> > >>>> continuing to
> > >>>> process the standby tasks while other instances continue to run
> would
> > >>>> pretty much
> > >>>> defeat the purpose of pausing it entirely, and might have unpleasant
> > >>>> consequences
> > >>>> for the unsuspecting developer.
> > >>>>
> > >>>> All that said, I don't want to block this KIP so if you have strong
> > >>>> feelings about the
> > >>>> standby behavior I'm happy to back down. I'm only pushing back now
> > >>> because
> > >>>> it
> > >>>> felt like there wasn't any particular motivation for the standbys to
> > >>>> continue processing
> > >>>> or not, and I figured I'd try to fill in this gap with my thoughts
> on
> > >> the
> > >>>> matter :)
> > >>>> Either way we should just make sure that this behavior is documented
> > >>>> clearly,
> > >>>> since it may be surprising if we decide to only pause active
> > processing
> > >>>> (another option
> > >>>> is to rename the method something like #pauseProcessing or
> > >>>> #pauseActiveProcessing
> > >>>> so that it's hard to miss).
> > >>>>
> > >>>> Thanks! Sorry for the lengthy response, but hopefully we won't need
> to
> > >>>> debate this any
> > >>>> further. Beyond this I'm satisfied with the latest proposal
> > >>>>
> > >>>> On Mon, May 9, 2022 at 5:16 PM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >>>>
> > >>>>> Thanks for the updates, Jim!
> > >>>>>
> > >>>>> After this discussion and your updates, this KIP looks good to me.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> John
> > >>>>>
> > >>>>> On Mon, May 9, 2022, at 17:52, Jim Hughes wrote:
> > >>>>>> Hi Sophie, all,
> > >>>>>>
> > >>>>>> I've updated the KIP with feedback from the discussion so far:
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> > >>>>>>
> > >>>>>> As a terse summary of my current position:
> > >>>>>> Pausing will only stop processing and punctuation (respecting
> > modular
> > >>>>>> topologies).
> > >>>>>> Paused topologies will still a) consume from input topics, b) call
> > >> the
> > >>>>>> usual commit pathways (commits will happen basically as they would
> > >>> have),
> > >>>>>> and c) standBy tasks will still be processed.
> > >>>>>>
> > >>>>>> Shout if the KIP or those details still need some TLC.  Responding
> > to
> > >>>>>> Sophie inline below.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman
> > >>>>>> <sop...@confluent.io.invalid> wrote:
> > >>>>>>
> > >>>>>>> Don't worry, I'm going to be adding the APIs for topology-level
> > >>> pausing
> > >>>>> as
> > >>>>>>> part of the modular topologies KIP,
> > >>>>>>> so we don't need to worry about that for now. That said, I don't
> > >> think
> > >>>>> we
> > >>>>>>> should brush it off entirely and design
> > >>>>>>> this feature in a way that's going to be incompatible or hugely
> > >> raise
> > >>>>> the
> > >>>>>>> LOE on bringing the (mostly already
> > >>>>>>> implemented) modular topologies feature into the public API, just
> > >>>>>>> because it "won the race to write a KIP" :)
> > >>>>>>>
> > >>>>>>
> > >>>>>> Yes, I'm hoping that this is all compatible with modular
> > >> topologies.  I
> > >>>>>> haven't seen anything so far which seems to be a problem; this KIP
> > is
> > >>>>> just
> > >>>>>> in a weird state to discuss details of acting on modular
> > >> topologies.:)
> > >>>>>>
> > >>>>>>
> > >>>>>>> I may be biased (ok, I definitely am), but I'm not in favor of
> > >> adding
> > >>>>> this
> > >>>>>>> as a state regardless of the modular topologies.
> > >>>>>>> First of all any change to the KafkaStreams state machine is a
> > >>> breaking
> > >>>>>>> change, no? So we would have to wait until
> > >>>>>>> the next major release which seems like an unnecessary thing to
> > >> block
> > >>>>> on.
> > >>>>>>> (Whether to add this as a state to the
> > >>>>>>> StreamThread's FSM is an implementation detail).
> > >>>>>>>
> > >>>>>>
> > >>>>>> +1.  I am sold on skipping out on new states.  I had that as a
> > >> rejected
> > >>>>>> alternative in the KIP and have added a few more words to that
> bit.
> > >>>>>>
> > >>>>>>
> > >>>>>>> Also, the semantics of using an `isPaused` method to distinguish
> a
> > >>>>> paused
> > >>>>>>> instance (or topology) make more sense
> > >>>>>>> to me -- this is a user-specified status, whereas the
> KafkaStreams
> > >>>>> state is
> > >>>>>>> intended to relay the status of the system
> > >>>>>>> itself. For example, if we are going to continue to poll during
> > >> pause,
> > >>>>> then
> > >>>>>>> shouldn't the client transition to REBALANCING?
> > >>>>>>> I believe it makes sense to still allow distinguishing these
> states
> > >>>>> while a
> > >>>>>>> client is paused, whereas making PAUSED its
> > >>>>>>> own state means you can't tell when the client is rebalancing vs
> > >>>>> running,
> > >>>>>>> or whether it is paused or dead: presumably
> > >>>>>>> the NOT_RUNNING/ERROR state would trump the PAUSED state, which
> > >> means
> > >>>>> you
> > >>>>>>> would not be able to rely on
> > >>>>>>> checking the state to see if you had called PAUSED on that
> > instance.
> > >>>>>>> Obviously you can work around this by just
> > >>>>>>> maintaining a flag in the usercode, but all this feels very
> > >> unnatural
> > >>>>> to me
> > >>>>>>> vs just checking the `#isPaused` API.
> > >>>>>>>
> > >>>>>>> On that note, I had one question -- at what point would the
> > >>> `#isPaused`
> > >>>>>>> check return true? Would it do so immediately
> > >>>>>>> after pausing the instance, or only once it has finished
> committing
> > >>>>> offsets
> > >>>>>>> and stopped returning records?
> > >>>>>>>
> > >>>>>>
> > >>>>>> Immediately, `#isPaused` tells you about metadata.
> > >>>>>>
> > >>>>>>
> > >>>>>>> Finally, on the note of punctuators I think it would make most
> > sense
> > >>> to
> > >>>>>>> either pause these as well or else add this an
> > >>>>>>> an explicit option for the user. If this feature is used to, for
> > >>>>> example,
> > >>>>>>> help save on processing costs while an app is
> > >>>>>>> not in use, then it would probably be surprising and perhaps
> > >> alarming
> > >>> to
> > >>>>>>> see certain kinds of processing still continue.
> > >>>>>>>
> > >>>>>>
> > >>>>>>   From other parts of the discussion, I'm sold on pausing
> > punctuation.
> > >>>>>>
> > >>>>>>
> > >>>>>>> The question of whether to continue fetching for standby tasks is
> > >>> maybe
> > >>>>> a
> > >>>>>>> bit more debatable, as it would certainly be
> > >>>>>>> nice to find your clients all caught up when you go to resume the
> > >>>>> instance
> > >>>>>>> again, but I would still strongly suggest
> > >>>>>>> pausing these as well. To use a similar example, imagine if you
> > >> paused
> > >>>>> an
> > >>>>>>> app because it was about to run out of
> > >>>>>>> disk. If the standbys kept processing and filled up the remaining
> > >>> space,
> > >>>>>>> you'd probably feel a bit betrayed by this API.
> > >>>>>>>
> > >>>>>>> WDYT?
> > >>>>>>>
> > >>>>>>
> > >>>>>> At the minute, my moderately held position is that standby tasks
> > >> ought
> > >>> to
> > >>>>>> continue reading and remain caught up.  If standby tasks would run
> > >> out
> > >>> of
> > >>>>>> space, there are probably bigger problems.
> > >>>>>>
> > >>>>>> If later it is desirable to manage punctuation or standby tasks,
> > then
> > >>> it
> > >>>>>> should be easy for future folks to modify things.
> > >>>>>>
> > >>>>>> Overall, I'd frame this KIP as "pause processing resulting in
> > >> outputs".
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>>
> > >>>>>> Jim
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>> On Mon, May 9, 2022 at 10:33 AM Guozhang Wang <
> wangg...@gmail.com>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> I think for named topology we can leave the scope of this KIP as
> > >> "all
> > >>>>> or
> > >>>>>>>> nothing", i.e. when you pause an instance you pause all of its
> > >>>>>>> topologies.
> > >>>>>>>> I raised this question in my previous email just trying to
> clarify
> > >> if
> > >>>>>>> this
> > >>>>>>>> is what you have in mind. We can leave the question of finer
> > >>>>> controlled
> > >>>>>>>> pausing behavior for later when we have named topology being
> > >> exposed
> > >>>>> via
> > >>>>>>>> another KIP.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>> On Mon, May 9, 2022 at 7:50 AM John Roesler <
> vvcep...@apache.org>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Jim,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the replies. This all sounds good to me. Just two
> > >> further
> > >>>>>>>>> comments:
> > >>>>>>>>>
> > >>>>>>>>> 3. It seems like you should aim for the simplest semantics. If
> > the
> > >>>>>>> intent
> > >>>>>>>>> is to “pause” the instance, then you’d better pause the whole
> > >>>>> instance.
> > >>>>>>>> If
> > >>>>>>>>> you leave punctuations and standbys running, I expect we’d see
> > bug
> > >>>>>>>> reports
> > >>>>>>>>> come in that the instance isn’t really paused.
> > >>>>>>>>>
> > >>>>>>>>> 5. Since you won the race to write a KIP, I don’t think it
> makes
> > >> too
> > >>>>>>> much
> > >>>>>>>>> sense to worry too much about modular topologies. When they
> > >> propose
> > >>>>>>> their
> > >>>>>>>>> KIP, they will have to specify a lot of state management
> > behavior,
> > >>>>> and
> > >>>>>>>>> pause/resume will have to be part of it. If they have some
> > concern
> > >>>>>>> about
> > >>>>>>>>> your KIP, they’ll chime in. It doesn’t make sense for you to
> try
> > >> and
> > >>>>>>>> guess
> > >>>>>>>>> what that proposal will look like.
> > >>>>>>>>>
> > >>>>>>>>> To be honest, you’re proposing a KafkaStreams runtime-level
> > >>>>>>> pause/resume
> > >>>>>>>>> function, not a topology-level one anyway, so it seems pretty
> > >> clear
> > >>>>>>> that
> > >>>>>>>> it
> > >>>>>>>>> would pause the whole runtime (of a single instance) regardless
> > of
> > >>>>> any
> > >>>>>>>>> modular topologies. If the intent is to pause individual
> > >> topologies
> > >>>>> in
> > >>>>>>>> the
> > >>>>>>>>> future, you’d need a different API anyway.
> > >>>>>>>>>
> > >>>>>>>>> Thanks!
> > >>>>>>>>> -John
> > >>>>>>>>>
> > >>>>>>>>> On Mon, May 9, 2022, at 08:10, Jim Hughes wrote:
> > >>>>>>>>>> Hi John,
> > >>>>>>>>>>
> > >>>>>>>>>> Long emails are great; responding inline!
> > >>>>>>>>>>
> > >>>>>>>>>> On Sat, May 7, 2022 at 4:54 PM John Roesler <
> > vvcep...@apache.org
> > >>>
> > >>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Thanks for the KIP, Jim!
> > >>>>>>>>>>>
> > >>>>>>>>>>> This conversation seems to highlight that the KIP needs to
> > >>>>> specify
> > >>>>>>>>>>> some of its behavior as well as its APIs, where the behavior
> is
> > >>>>>>>>>>> observable and significant to users.
> > >>>>>>>>>>>
> > >>>>>>>>>>> For example:
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1. Do you plan to have a guarantee that immediately after
> > >>>>>>>>>>> calling KafkaStreams.pause(), users should observe that the
> > >>>>> instance
> > >>>>>>>>>>> stops processing new records? Or should they expect that the
> > >>>>> threads
> > >>>>>>>>>>> will continue to process some records and pause
> asynchronously
> > >>>>>>>>>>> (you already answered this in the thread earlier)?
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> I'm happy to build up to a guarantee of sorts.  My current
> idea
> > >> is
> > >>>>>>> that
> > >>>>>>>>>> pause() does not do anything "exceptional" to get control back
> > >>>>> from a
> > >>>>>>>>>> running topology.  A currently running topology would get to
> > >>>>> complete
> > >>>>>>>> its
> > >>>>>>>>>> loop.
> > >>>>>>>>>>
> > >>>>>>>>>> Separately, I'm still piecing together how commits work.  By
> > some
> > >>>>>>>>>> mechanism, after a pause, I do agree that the topology needs
> to
> > >>>>>>> commit
> > >>>>>>>>> its
> > >>>>>>>>>> work in some manner.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> 2. Will the threads continue to poll new records until they
> > >>>>>>> naturally
> > >>>>>>>>> fill
> > >>>>>>>>>>> up the task buffers, or will they immediately pause their
> > >>>>> Consumers
> > >>>>>>>>>>> as well?
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Presently, I'm suggesting that consumers would fill up their
> > >>>>> buffers.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> 3. Will threads continue to call (system time) punctuators,
> or
> > >>>>> would
> > >>>>>>>>>>> punctuations also be paused?
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> In my first pass at thinking through this, I left the
> > punctuators
> > >>>>>>>>> running.
> > >>>>>>>>>> To be honest, I'm not sure what they do, so my approach is
> > either
> > >>>>>>> lucky
> > >>>>>>>>> and
> > >>>>>>>>>> correct or it could be Very Clearly Wrong.;)
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> I realize that some of those questions simply may not have
> > >>>>> occurred
> > >>>>>>> to
> > >>>>>>>>>>> you, so this is not a criticism for leaving them off; I'm
> just
> > >>>>>>>> pointing
> > >>>>>>>>> out
> > >>>>>>>>>>> that although we don't tend to mention implementation details
> > in
> > >>>>>>> KIPs,
> > >>>>>>>>>>> we also can't be too high level, since there are a lot of
> > >>>>>>> operational
> > >>>>>>>>>>> details that users rely on to achieve various behaviors in
> > >>>>> Streams.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Ayup, I will add some details as we iron out the guarantees,
> > >>>>>>>>> implementation
> > >>>>>>>>>> details that are at the API level.  This one is tough since
> > >>>>> internal
> > >>>>>>>>>> features like NamedTopologies are part of the discussion.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> A couple more comments:
> > >>>>>>>>>>>
> > >>>>>>>>>>> 4. +1 to what Guozhang said. It seems like we should we also
> do
> > >> a
> > >>>>>>>> commit
> > >>>>>>>>>>> before entering the paused state. That way, any open
> > >> transactions
> > >>>>>>>> would
> > >>>>>>>>>>> be closed and not have to worry about timing out. Even under
> > >>>>> ALOS,
> > >>>>>>> it
> > >>>>>>>>>>> seems best to go ahead and complete the processing of
> in-flight
> > >>>>>>>> records
> > >>>>>>>>>>> by committing. That way, if anything happens to die while
> it's
> > >>>>>>> paused,
> > >>>>>>>>>>> existing
> > >>>>>>>>>>> work won't have to be repeated. Plus, if there are any
> > >> processors
> > >>>>>>> with
> > >>>>>>>>> side
> > >>>>>>>>>>> effects, users won't have to tolerate weird edge cases where
> a
> > >>>>> pause
> > >>>>>>>>> occurs
> > >>>>>>>>>>> after a processor sees a record, but before the result is
> sent
> > >> to
> > >>>>>>> its
> > >>>>>>>>>>> outputs.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 5. I noticed that you proposed not to add a PAUSED state,
> but I
> > >>>>>>> didn't
> > >>>>>>>>>>> follow
> > >>>>>>>>>>> the rationale. Adding a state seems beneficial for a number
> of
> > >>>>>>>> reasons:
> > >>>>>>>>>>> StreamThreads already use the thread state to determine
> whether
> > >>>>> to
> > >>>>>>>>> process
> > >>>>>>>>>>> or not, so avoiding a new State would just mean adding a
> > >> separate
> > >>>>>>> flag
> > >>>>>>>>> to
> > >>>>>>>>>>> track
> > >>>>>>>>>>> and then checking your new flag in addition to the State in
> the
> > >>>>>>>> thread.
> > >>>>>>>>>>> Also,
> > >>>>>>>>>>> operating Streams applications is a non-trivial task, and
> users
> > >>>>> rely
> > >>>>>>>> on
> > >>>>>>>>>>> the State
> > >>>>>>>>>>> (and transitions) to understand Streams's behavior. Adding a
> > >>>>> PAUSED
> > >>>>>>>>> state
> > >>>>>>>>>>> is an elegant way to communicate to operators what is
> happening
> > >>>>> with
> > >>>>>>>> the
> > >>>>>>>>>>> application. Note that the person digging though logs and
> > >>>>> metrics,
> > >>>>>>>>> trying
> > >>>>>>>>>>> to understand why the application isn't doing anything is
> > >>>>> probably
> > >>>>>>> not
> > >>>>>>>>>>> going
> > >>>>>>>>>>> to be the same person who is calling pause() and resume().
> > Also,
> > >>>>> if
> > >>>>>>>> you
> > >>>>>>>>> add
> > >>>>>>>>>>> a state, you don't need `isPaused()`.
> > >>>>>>>>>>>
> > >>>>>>>>>>> 5b. If you buy the arguments to go ahead and commit as well
> as
> > >>>>> the
> > >>>>>>>>>>> argument to add a State, then I'd also suggest to follow the
> > >>>>>>> existing
> > >>>>>>>>>>> patterns
> > >>>>>>>>>>> for the shutdown states by also adding PAUSING. That
> > >>>>>>>>>>> way, you'll also expose a way to understand that Streams
> > >> received
> > >>>>>>> the
> > >>>>>>>>>>> signal
> > >>>>>>>>>>> to pause, and that it's still processing and committing some
> > >>>>> records
> > >>>>>>>> in
> > >>>>>>>>>>> preparation to enter a PAUSED state. I'm not sure if a
> RESUMING
> > >>>>>>> state
> > >>>>>>>>> would
> > >>>>>>>>>>> also make sense.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> I hit a tricky bit when thinking through having a PAUSED
> > >>>>> state...  If
> > >>>>>>>> one
> > >>>>>>>>>> is using Named Topologies, and some of them are paused, what
> > >>>>> state is
> > >>>>>>>> the
> > >>>>>>>>>> Streams instance in?  If we can agree on that, things may
> become
> > >>>>>>>>> clear....
> > >>>>>>>>>> I can see two quick ideas:
> > >>>>>>>>>>
> > >>>>>>>>>> 1.  The state is RUNNING and NamedTopologies have some other
> way
> > >>>>> to
> > >>>>>>>>>> indicate state.
> > >>>>>>>>>>
> > >>>>>>>>>> 2.  The state is something messy like PARTIALLY_PAUSED to
> > reflect
> > >>>>>>> that
> > >>>>>>>>> the
> > >>>>>>>>>> instance has something interesting going on.
> > >>>>>>>>>>
> > >>>>>>>>>> When I poked at things initially, I did try out having
> different
> > >>>>>>>> states,
> > >>>>>>>>>> and I readily agree that a PAUSING state may make sense.
> > >>>>> (Especially
> > >>>>>>>> if
> > >>>>>>>>>> there's a need to run commits before transitioning all the way
> > to
> > >>>>>>>>> PAUSED.)
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> And that's all I have to say about that. I hope you don't
> find
> > >> my
> > >>>>>>>>>>> long message offputting. I'm fundamentally in favor of your
> > KIP,
> > >>>>>>>>>>> and I think with a little more explanation in the KIP, and a
> > few
> > >>>>>>>>>>> small tweaks to the proposal, we'll be able to provide good
> > >>>>>>>>>>> ergonomics to our users.
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks!
> > >>>>>>>>>>
> > >>>>>>>>>> Jim
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>> -John
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:
> > >>>>>>>>>>>> I'm in favor of the "just pausing the instance itself“
> option
> > >>>>> as
> > >>>>>>>>> well. As
> > >>>>>>>>>>>> for EOS, the point is that when the processing is paused, we
> > >>>>> would
> > >>>>>>>> not
> > >>>>>>>>>>>> trigger any `producer.send` during the time, and the
> > >>>>> transaction
> > >>>>>>>>> timeout
> > >>>>>>>>>>> is
> > >>>>>>>>>>>> sort of relying on that behavior, so my point was that it's
> > >>>>>>> probably
> > >>>>>>>>>>> better
> > >>>>>>>>>>>> to also commit the processing before we pause it.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Fri, May 6, 2022 at 6:12 PM Jim Hughes
> > >>>>>>>>> <jhug...@confluent.io.invalid>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Matthias,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Since the only thing which will be paused is processing the
> > >>>>>>>>> topology, I
> > >>>>>>>>>>>>> think we can let commits happen naturally.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Good point about getting the paused state to new members;
> it
> > >>>>> is
> > >>>>>>>>> seeming
> > >>>>>>>>>>>>> like the "building block" approach is a good one to keep
> > >>>>> things
> > >>>>>>>>> simple
> > >>>>>>>>>>> at
> > >>>>>>>>>>>>> first.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Jim
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <
> > >>>>> mj...@apache.org
> > >>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think it's tricky to propagate a pauseAll() via the
> > >>>>> rebalance
> > >>>>>>>>>>>>>> protocol. New members joining the group would need to get
> > >>>>>>> paused,
> > >>>>>>>>> too?
> > >>>>>>>>>>>>>> Could there be weird race conditions with overlapping
> > >>>>>>> pauseAll()
> > >>>>>>>>> and
> > >>>>>>>>>>>>>> resumeAll() calls on different instanced while there could
> > >>>>> be a
> > >>>>>>>>>>> errors /
> > >>>>>>>>>>>>>> network partitions or similar?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I would argue that similar to IQ, we provide the basic
> > >>>>> building
> > >>>>>>>>>>> blocks,
> > >>>>>>>>>>>>>> and leave it the user users to implement cross instance
> > >>>>>>>> management
> > >>>>>>>>>>> for a
> > >>>>>>>>>>>>>> pauseAll() scenario. -- Also, if there is really demand,
> we
> > >>>>> can
> > >>>>>>>>> always
> > >>>>>>>>>>>>>> add pauseAll()/resumeAll() as follow up work.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> About named typologies: I agree to Jim to not include them
> > >>>>> in
> > >>>>>>>> this
> > >>>>>>>>> KIP
> > >>>>>>>>>>>>>> as they are not a public feature yet. If we make named
> > >>>>>>> typologies
> > >>>>>>>>>>>>>> public, the corresponding KIP should extend the
> pause/resume
> > >>>>>>>>> feature
> > >>>>>>>>>>>>>> (ie, APIs) accordingly. Of course, the code can (and
> should)
> > >>>>>>>>> already
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>> setup to support it to be future proof.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Good call out about commit and EOS -- to simplify it, I
> > >>>>> think
> > >>>>>>> it
> > >>>>>>>>> might
> > >>>>>>>>>>>>>> be good to commit also for the at-least-once case?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On 5/6/22 1:05 PM, Jim Hughes wrote:
> > >>>>>>>>>>>>>>> Hi Bill,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Great questions; I'll do my best to reply inline:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <
> > >>>>>>> bbej...@gmail.com>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Jim,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the KIP.  I have a couple of meta-questions
> as
> > >>>>>>>> well:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 1) Regarding pausing only a subset of running instances,
> > >>>>> I'm
> > >>>>>>>>>>> thinking
> > >>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>> may be a use case for pausing all of them.
> > >>>>>>>>>>>>>>>>       Would it make sense to also allow for pausing all
> > >>>>>>>> instances
> > >>>>>>>>> by
> > >>>>>>>>>>>>>> adding a
> > >>>>>>>>>>>>>>>> method `pauseAll()` or something similar?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Honestly, I'm indifferent on this point.  Presently, I
> > >>>>> think
> > >>>>>>>>> what I
> > >>>>>>>>>>>>> have
> > >>>>>>>>>>>>>>> proposed is the minimal change to get the ability to
> pause
> > >>>>>>> and
> > >>>>>>>>>>> resume
> > >>>>>>>>>>>>>>> processing.  If adding a 'pauseAll()' is required, I'd be
> > >>>>>>> happy
> > >>>>>>>>> to
> > >>>>>>>>>>> do
> > >>>>>>>>>>>>>> that!
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>    From Guozhang's email, it sounds like this would
> require
> > >>>>>>> using
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>> rebalance protocol to trigger the coordination.  Would
> > >>>>> there
> > >>>>>>> be
> > >>>>>>>>>>> enough
> > >>>>>>>>>>>>>> room
> > >>>>>>>>>>>>>>> in that approach to indicate that a named topology is to
> > >>>>> be
> > >>>>>>>>> paused
> > >>>>>>>>>>>>> across
> > >>>>>>>>>>>>>>> all nodes?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 2) Would pausing affect standby tasks?  For example,
> > >>>>> imagine
> > >>>>>>>>> there
> > >>>>>>>>>>>>> are 3
> > >>>>>>>>>>>>>>>> instances A, B, and C.
> > >>>>>>>>>>>>>>>>       A user elects to pause instance C only but it
> hosts
> > >>>>> the
> > >>>>>>>>> standby
> > >>>>>>>>>>>>>> tasks
> > >>>>>>>>>>>>>>>> for A.
> > >>>>>>>>>>>>>>>>       Would the standby tasks on the paused application
> > >>>>>>> continue
> > >>>>>>>>> to
> > >>>>>>>>>>> read
> > >>>>>>>>>>>>>> from
> > >>>>>>>>>>>>>>>> the changelog topic?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Yes, standby tasks would continue reading from the
> > >>>>> changelog
> > >>>>>>>>> topic.
> > >>>>>>>>>>>>> All
> > >>>>>>>>>>>>>>> consumers would continue reading to avoid getting dropped
> > >>>>>>> from
> > >>>>>>>>> their
> > >>>>>>>>>>>>>>> consumer groups.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Jim
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks!
> > >>>>>>>>>>>>>>>> Bill
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 2:44 PM Jim Hughes
> > >>>>>>>>>>>>> <jhug...@confluent.io.invalid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Guozhang,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for the feedback; responses inline below:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <
> > >>>>>>>>> wangg...@gmail.com>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hello Jim,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks for the proposed KIP. I have some meta
> questions
> > >>>>>>>> about
> > >>>>>>>>> it:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 1) Would an instance always pause/resume all of its
> > >>>>>>> current
> > >>>>>>>>> owned
> > >>>>>>>>>>>>>>>>>> topologies (i.e. the named topologies), or are there
> > >>>>> any
> > >>>>>>>>>>> scenarios
> > >>>>>>>>>>>>>>>> where
> > >>>>>>>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>>> only want to pause/resume a subset of them?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> An instance may wish to pause some of its named
> > >>>>> topologies.
> > >>>>>>>> I
> > >>>>>>>>> was
> > >>>>>>>>>>>>>> unsure
> > >>>>>>>>>>>>>>>>> what to say about named topologies in the KIP since
> they
> > >>>>>>> seem
> > >>>>>>>>> to
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>> an
> > >>>>>>>>>>>>>>>>> internal detail at the moment.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I intend to add to KafkaStreamsNamedTopologyWrapper
> > >>>>> methods
> > >>>>>>>>> like:
> > >>>>>>>>>>>>>>>>>        public void pauseNamedTopology(final String
> > >>>>>>>>> topologyToPause)
> > >>>>>>>>>>>>>>>>>        public boolean isNamedTopologyPaused(final
> String
> > >>>>>>>>> topology)
> > >>>>>>>>>>>>>>>>>        public void resumeNamedTopology(final String
> > >>>>>>>>>>> topologyToResume)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 2) From a user's perspective, do we want to always
> > >>>>> issue a
> > >>>>>>>>>>>>>>>> `pause/resume`
> > >>>>>>>>>>>>>>>>>> to all the instances or not? For example, we can
> define
> > >>>>>>> the
> > >>>>>>>>>>>>> semantics
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> the function as "you only need to call this function
> on
> > >>>>>>> any
> > >>>>>>>> of
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> application's instances, and all instances would then
> > >>>>>>> pause
> > >>>>>>>>> (via
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> rebalance error codes)", or as "you would call this
> > >>>>>>> function
> > >>>>>>>>> for
> > >>>>>>>>>>> all
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> instances of an application". Which one are you
> > >>>>> referring
> > >>>>>>>> to?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> My initial intent is that one would call this function
> > >>>>> on
> > >>>>>>> any
> > >>>>>>>>>>>>> instances
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>> the application that one wishes to pause.  This should
> > >>>>>>> allow
> > >>>>>>>>> more
> > >>>>>>>>>>>>>> control
> > >>>>>>>>>>>>>>>>> (in case one wanted to pause a portion of the
> > >>>>> instances).
> > >>>>>>> On
> > >>>>>>>>> the
> > >>>>>>>>>>>>> other
> > >>>>>>>>>>>>>>>>> hand, this approach would put more work on the
> > >>>>> implementer
> > >>>>>>> to
> > >>>>>>>>>>>>>> coordinate
> > >>>>>>>>>>>>>>>>> calling pause or resume across instances.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If the other option is more suitable, happy to do that
> > >>>>>>>> instead.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 3) With EOS, there's a transaction timeout which would
> > >>>>>>>>> determine
> > >>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>> long a
> > >>>>>>>>>>>>>>>>>> transaction can stay idle before it's force-aborted on
> > >>>>> the
> > >>>>>>>>> broker
> > >>>>>>>>>>>>>>>> side. I
> > >>>>>>>>>>>>>>>>>> think when a pause is issued, that means we'd need to
> > >>>>>>>>> immediately
> > >>>>>>>>>>>>>>>> commit
> > >>>>>>>>>>>>>>>>>> the current transaction for EOS since we do not know
> > >>>>> how
> > >>>>>>>> long
> > >>>>>>>>> we
> > >>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>>> pause for. Is that right? If yes could you please
> > >>>>> clarify
> > >>>>>>>>> that in
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> doc
> > >>>>>>>>>>>>>>>>>> as well.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Good point.  My intent is for pause() to wait for the
> > >>>>> next
> > >>>>>>>>>>> iteration
> > >>>>>>>>>>>>>>>>> through `runOnce()` and then only skip over the
> > >>>>> processing
> > >>>>>>>> for
> > >>>>>>>>>>> paused
> > >>>>>>>>>>>>>>>> tasks
> > >>>>>>>>>>>>>>>>> in `taskManager.process(numIterations, time)`.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Do commits live inside that call or do they live
> > >>>>>>>>> across/outside of
> > >>>>>>>>>>>>> it?
> > >>>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>> the former case, I think there shouldn't be any issues
> > >>>>> with
> > >>>>>>>>> EOS.
> > >>>>>>>>>>>>>>>>> Otherwise, we may need to work through some details to
> > >>>>> get
> > >>>>>>>> EOS
> > >>>>>>>>>>> right.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Once we figure that out, I can update the KIP.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Jim
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes
> > >>>>>>>>>>>>>>>> <jhug...@confluent.io.invalid
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I have written up a KIP for adding the ability to
> > >>>>> pause
> > >>>>>>> and
> > >>>>>>>>>>> resume
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>> processing of a topology in AK Streams.  The KIP is
> > >>>>> here:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks in advance for your feedback!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Jim
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> --
> > >>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to