Hi Matthias,

I like it.  I've updated the KIP to reflect that detail; I put the details
in the docs for pause.

Cheers,

Jim

On Tue, May 10, 2022 at 7:51 PM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the KIP. Overall LGTM.
>
> Can we clarify one question: would it be allowed to call `pause()`
> before calling `start()`? I don't see any reason why we would need to
> disallow it?
>
> It could be helpful to start a KafkaStreams client in paused state --
> otherwise there is a race between calling `start()` and calling `pause()`.
>
> If we allow it, we should clearly document it.
>
>
> -Matthias
>
> On 5/10/22 12:04 PM, Jim Hughes wrote:
> > Hi Bill, all,
> >
> > Thank you.  I've updated the KIP to reflect pausing standby tasks as
> well.
> > I think all the outstanding points have been addressed and I'm going to
> > start the vote thread!
> >
> > Cheers,
> >
> > Jim
> >
> >
> >
> > On Tue, May 10, 2022 at 2:43 PM Bill Bejeck <bbej...@gmail.com> wrote:
> >
> >> Hi Jim,
> >>
> >> After reading the comments on the KIP, I agree that it makes sense to
> pause
> >> all activities and any changes can be made later on.
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Tue, May 10, 2022 at 4:03 AM Bruno Cadonna <cado...@apache.org>
> wrote:
> >>
> >>> Hi Jim,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>> I am fine with the KIP in general.
> >>>
> >>> However, I am with Sophie and John to also pause the standbys for the
> >>> reasons they brought up. Is there a specific reason you want to keep
> >>> standbys going? It feels like premature optimization to me. We can
> still
> >>> add keeping standby running in a follow up if needed.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 10.05.22 05:15, Sophie Blee-Goldman wrote:
> >>>> Thanks Jim, just one note/question on the standby tasks:
> >>>>
> >>>> At the minute, my moderately held position is that standby tasks ought
> >> to
> >>>>> continue reading and remain caught up.  If standby tasks would run
> out
> >>> of
> >>>>> space, there are probably bigger problems.
> >>>>
> >>>>
> >>>> For a single node application, or when the #pause API is invoked on
> all
> >>>> instances,
> >>>> then there won't be any further active processing and thus nothing to
> >>> keep
> >>>> up with,
> >>>> right? So for that case, it's just a matter of whether any standbys
> >> that
> >>>> are lagging
> >>>> will have the chance to catch up to the (paused) active task state
> >> before
> >>>> they stop
> >>>> as well, in which case having them continue feels fine to me. However
> >>> this
> >>>> is a
> >>>> relatively trivial benefit and I would only consider it as a deciding
> >>>> factor when all
> >>>> things are equal otherwise.
> >>>>
> >>>> My concern is the more interesting case: when this feature is used to
> >>> pause
> >>>> only
> >>>> one nodes, or some subset of the overall application. In this case,
> >> yes,
> >>>> the standby
> >>>> tasks will indeed fall out of sync. But the only reason I can imagine
> >>>> someone using
> >>>> the pause feature in such a way is because there is something going
> >>> wrong,
> >>>> or about
> >>>> to go wrong, on that particular node. For example as mentioned above,
> >> if
> >>>> the user
> >>>> wants to cut down on costs without stopping everything, or if the node
> >> is
> >>>> about to
> >>>> run out of disk or needs to be debugged or so on. And in this case,
> >>>> continuing to
> >>>> process the standby tasks while other instances continue to run would
> >>>> pretty much
> >>>> defeat the purpose of pausing it entirely, and might have unpleasant
> >>>> consequences
> >>>> for the unsuspecting developer.
> >>>>
> >>>> All that said, I don't want to block this KIP so if you have strong
> >>>> feelings about the
> >>>> standby behavior I'm happy to back down. I'm only pushing back now
> >>> because
> >>>> it
> >>>> felt like there wasn't any particular motivation for the standbys to
> >>>> continue processing
> >>>> or not, and I figured I'd try to fill in this gap with my thoughts on
> >> the
> >>>> matter :)
> >>>> Either way we should just make sure that this behavior is documented
> >>>> clearly,
> >>>> since it may be surprising if we decide to only pause active
> processing
> >>>> (another option
> >>>> is to rename the method something like #pauseProcessing or
> >>>> #pauseActiveProcessing
> >>>> so that it's hard to miss).
> >>>>
> >>>> Thanks! Sorry for the lengthy response, but hopefully we won't need to
> >>>> debate this any
> >>>> further. Beyond this I'm satisfied with the latest proposal
> >>>>
> >>>> On Mon, May 9, 2022 at 5:16 PM John Roesler <vvcep...@apache.org>
> >> wrote:
> >>>>
> >>>>> Thanks for the updates, Jim!
> >>>>>
> >>>>> After this discussion and your updates, this KIP looks good to me.
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Mon, May 9, 2022, at 17:52, Jim Hughes wrote:
> >>>>>> Hi Sophie, all,
> >>>>>>
> >>>>>> I've updated the KIP with feedback from the discussion so far:
> >>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> >>>>>>
> >>>>>> As a terse summary of my current position:
> >>>>>> Pausing will only stop processing and punctuation (respecting
> modular
> >>>>>> topologies).
> >>>>>> Paused topologies will still a) consume from input topics, b) call
> >> the
> >>>>>> usual commit pathways (commits will happen basically as they would
> >>> have),
> >>>>>> and c) standBy tasks will still be processed.
> >>>>>>
> >>>>>> Shout if the KIP or those details still need some TLC.  Responding
> to
> >>>>>> Sophie inline below.
> >>>>>>
> >>>>>>
> >>>>>> On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman
> >>>>>> <sop...@confluent.io.invalid> wrote:
> >>>>>>
> >>>>>>> Don't worry, I'm going to be adding the APIs for topology-level
> >>> pausing
> >>>>> as
> >>>>>>> part of the modular topologies KIP,
> >>>>>>> so we don't need to worry about that for now. That said, I don't
> >> think
> >>>>> we
> >>>>>>> should brush it off entirely and design
> >>>>>>> this feature in a way that's going to be incompatible or hugely
> >> raise
> >>>>> the
> >>>>>>> LOE on bringing the (mostly already
> >>>>>>> implemented) modular topologies feature into the public API, just
> >>>>>>> because it "won the race to write a KIP" :)
> >>>>>>>
> >>>>>>
> >>>>>> Yes, I'm hoping that this is all compatible with modular
> >> topologies.  I
> >>>>>> haven't seen anything so far which seems to be a problem; this KIP
> is
> >>>>> just
> >>>>>> in a weird state to discuss details of acting on modular
> >> topologies.:)
> >>>>>>
> >>>>>>
> >>>>>>> I may be biased (ok, I definitely am), but I'm not in favor of
> >> adding
> >>>>> this
> >>>>>>> as a state regardless of the modular topologies.
> >>>>>>> First of all any change to the KafkaStreams state machine is a
> >>> breaking
> >>>>>>> change, no? So we would have to wait until
> >>>>>>> the next major release which seems like an unnecessary thing to
> >> block
> >>>>> on.
> >>>>>>> (Whether to add this as a state to the
> >>>>>>> StreamThread's FSM is an implementation detail).
> >>>>>>>
> >>>>>>
> >>>>>> +1.  I am sold on skipping out on new states.  I had that as a
> >> rejected
> >>>>>> alternative in the KIP and have added a few more words to that bit.
> >>>>>>
> >>>>>>
> >>>>>>> Also, the semantics of using an `isPaused` method to distinguish a
> >>>>> paused
> >>>>>>> instance (or topology) make more sense
> >>>>>>> to me -- this is a user-specified status, whereas the KafkaStreams
> >>>>> state is
> >>>>>>> intended to relay the status of the system
> >>>>>>> itself. For example, if we are going to continue to poll during
> >> pause,
> >>>>> then
> >>>>>>> shouldn't the client transition to REBALANCING?
> >>>>>>> I believe it makes sense to still allow distinguishing these states
> >>>>> while a
> >>>>>>> client is paused, whereas making PAUSED its
> >>>>>>> own state means you can't tell when the client is rebalancing vs
> >>>>> running,
> >>>>>>> or whether it is paused or dead: presumably
> >>>>>>> the NOT_RUNNING/ERROR state would trump the PAUSED state, which
> >> means
> >>>>> you
> >>>>>>> would not be able to rely on
> >>>>>>> checking the state to see if you had called PAUSED on that
> instance.
> >>>>>>> Obviously you can work around this by just
> >>>>>>> maintaining a flag in the usercode, but all this feels very
> >> unnatural
> >>>>> to me
> >>>>>>> vs just checking the `#isPaused` API.
> >>>>>>>
> >>>>>>> On that note, I had one question -- at what point would the
> >>> `#isPaused`
> >>>>>>> check return true? Would it do so immediately
> >>>>>>> after pausing the instance, or only once it has finished committing
> >>>>> offsets
> >>>>>>> and stopped returning records?
> >>>>>>>
> >>>>>>
> >>>>>> Immediately, `#isPaused` tells you about metadata.
> >>>>>>
> >>>>>>
> >>>>>>> Finally, on the note of punctuators I think it would make most
> sense
> >>> to
> >>>>>>> either pause these as well or else add this an
> >>>>>>> an explicit option for the user. If this feature is used to, for
> >>>>> example,
> >>>>>>> help save on processing costs while an app is
> >>>>>>> not in use, then it would probably be surprising and perhaps
> >> alarming
> >>> to
> >>>>>>> see certain kinds of processing still continue.
> >>>>>>>
> >>>>>>
> >>>>>>   From other parts of the discussion, I'm sold on pausing
> punctuation.
> >>>>>>
> >>>>>>
> >>>>>>> The question of whether to continue fetching for standby tasks is
> >>> maybe
> >>>>> a
> >>>>>>> bit more debatable, as it would certainly be
> >>>>>>> nice to find your clients all caught up when you go to resume the
> >>>>> instance
> >>>>>>> again, but I would still strongly suggest
> >>>>>>> pausing these as well. To use a similar example, imagine if you
> >> paused
> >>>>> an
> >>>>>>> app because it was about to run out of
> >>>>>>> disk. If the standbys kept processing and filled up the remaining
> >>> space,
> >>>>>>> you'd probably feel a bit betrayed by this API.
> >>>>>>>
> >>>>>>> WDYT?
> >>>>>>>
> >>>>>>
> >>>>>> At the minute, my moderately held position is that standby tasks
> >> ought
> >>> to
> >>>>>> continue reading and remain caught up.  If standby tasks would run
> >> out
> >>> of
> >>>>>> space, there are probably bigger problems.
> >>>>>>
> >>>>>> If later it is desirable to manage punctuation or standby tasks,
> then
> >>> it
> >>>>>> should be easy for future folks to modify things.
> >>>>>>
> >>>>>> Overall, I'd frame this KIP as "pause processing resulting in
> >> outputs".
> >>>>>>
> >>>>>> Cheers,
> >>>>>>
> >>>>>> Jim
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> On Mon, May 9, 2022 at 10:33 AM Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> I think for named topology we can leave the scope of this KIP as
> >> "all
> >>>>> or
> >>>>>>>> nothing", i.e. when you pause an instance you pause all of its
> >>>>>>> topologies.
> >>>>>>>> I raised this question in my previous email just trying to clarify
> >> if
> >>>>>>> this
> >>>>>>>> is what you have in mind. We can leave the question of finer
> >>>>> controlled
> >>>>>>>> pausing behavior for later when we have named topology being
> >> exposed
> >>>>> via
> >>>>>>>> another KIP.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Mon, May 9, 2022 at 7:50 AM John Roesler <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Jim,
> >>>>>>>>>
> >>>>>>>>> Thanks for the replies. This all sounds good to me. Just two
> >> further
> >>>>>>>>> comments:
> >>>>>>>>>
> >>>>>>>>> 3. It seems like you should aim for the simplest semantics. If
> the
> >>>>>>> intent
> >>>>>>>>> is to “pause” the instance, then you’d better pause the whole
> >>>>> instance.
> >>>>>>>> If
> >>>>>>>>> you leave punctuations and standbys running, I expect we’d see
> bug
> >>>>>>>> reports
> >>>>>>>>> come in that the instance isn’t really paused.
> >>>>>>>>>
> >>>>>>>>> 5. Since you won the race to write a KIP, I don’t think it makes
> >> too
> >>>>>>> much
> >>>>>>>>> sense to worry too much about modular topologies. When they
> >> propose
> >>>>>>> their
> >>>>>>>>> KIP, they will have to specify a lot of state management
> behavior,
> >>>>> and
> >>>>>>>>> pause/resume will have to be part of it. If they have some
> concern
> >>>>>>> about
> >>>>>>>>> your KIP, they’ll chime in. It doesn’t make sense for you to try
> >> and
> >>>>>>>> guess
> >>>>>>>>> what that proposal will look like.
> >>>>>>>>>
> >>>>>>>>> To be honest, you’re proposing a KafkaStreams runtime-level
> >>>>>>> pause/resume
> >>>>>>>>> function, not a topology-level one anyway, so it seems pretty
> >> clear
> >>>>>>> that
> >>>>>>>> it
> >>>>>>>>> would pause the whole runtime (of a single instance) regardless
> of
> >>>>> any
> >>>>>>>>> modular topologies. If the intent is to pause individual
> >> topologies
> >>>>> in
> >>>>>>>> the
> >>>>>>>>> future, you’d need a different API anyway.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>> On Mon, May 9, 2022, at 08:10, Jim Hughes wrote:
> >>>>>>>>>> Hi John,
> >>>>>>>>>>
> >>>>>>>>>> Long emails are great; responding inline!
> >>>>>>>>>>
> >>>>>>>>>> On Sat, May 7, 2022 at 4:54 PM John Roesler <
> vvcep...@apache.org
> >>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the KIP, Jim!
> >>>>>>>>>>>
> >>>>>>>>>>> This conversation seems to highlight that the KIP needs to
> >>>>> specify
> >>>>>>>>>>> some of its behavior as well as its APIs, where the behavior is
> >>>>>>>>>>> observable and significant to users.
> >>>>>>>>>>>
> >>>>>>>>>>> For example:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. Do you plan to have a guarantee that immediately after
> >>>>>>>>>>> calling KafkaStreams.pause(), users should observe that the
> >>>>> instance
> >>>>>>>>>>> stops processing new records? Or should they expect that the
> >>>>> threads
> >>>>>>>>>>> will continue to process some records and pause asynchronously
> >>>>>>>>>>> (you already answered this in the thread earlier)?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I'm happy to build up to a guarantee of sorts.  My current idea
> >> is
> >>>>>>> that
> >>>>>>>>>> pause() does not do anything "exceptional" to get control back
> >>>>> from a
> >>>>>>>>>> running topology.  A currently running topology would get to
> >>>>> complete
> >>>>>>>> its
> >>>>>>>>>> loop.
> >>>>>>>>>>
> >>>>>>>>>> Separately, I'm still piecing together how commits work.  By
> some
> >>>>>>>>>> mechanism, after a pause, I do agree that the topology needs to
> >>>>>>> commit
> >>>>>>>>> its
> >>>>>>>>>> work in some manner.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 2. Will the threads continue to poll new records until they
> >>>>>>> naturally
> >>>>>>>>> fill
> >>>>>>>>>>> up the task buffers, or will they immediately pause their
> >>>>> Consumers
> >>>>>>>>>>> as well?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Presently, I'm suggesting that consumers would fill up their
> >>>>> buffers.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> 3. Will threads continue to call (system time) punctuators, or
> >>>>> would
> >>>>>>>>>>> punctuations also be paused?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> In my first pass at thinking through this, I left the
> punctuators
> >>>>>>>>> running.
> >>>>>>>>>> To be honest, I'm not sure what they do, so my approach is
> either
> >>>>>>> lucky
> >>>>>>>>> and
> >>>>>>>>>> correct or it could be Very Clearly Wrong.;)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> I realize that some of those questions simply may not have
> >>>>> occurred
> >>>>>>> to
> >>>>>>>>>>> you, so this is not a criticism for leaving them off; I'm just
> >>>>>>>> pointing
> >>>>>>>>> out
> >>>>>>>>>>> that although we don't tend to mention implementation details
> in
> >>>>>>> KIPs,
> >>>>>>>>>>> we also can't be too high level, since there are a lot of
> >>>>>>> operational
> >>>>>>>>>>> details that users rely on to achieve various behaviors in
> >>>>> Streams.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Ayup, I will add some details as we iron out the guarantees,
> >>>>>>>>> implementation
> >>>>>>>>>> details that are at the API level.  This one is tough since
> >>>>> internal
> >>>>>>>>>> features like NamedTopologies are part of the discussion.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> A couple more comments:
> >>>>>>>>>>>
> >>>>>>>>>>> 4. +1 to what Guozhang said. It seems like we should we also do
> >> a
> >>>>>>>> commit
> >>>>>>>>>>> before entering the paused state. That way, any open
> >> transactions
> >>>>>>>> would
> >>>>>>>>>>> be closed and not have to worry about timing out. Even under
> >>>>> ALOS,
> >>>>>>> it
> >>>>>>>>>>> seems best to go ahead and complete the processing of in-flight
> >>>>>>>> records
> >>>>>>>>>>> by committing. That way, if anything happens to die while it's
> >>>>>>> paused,
> >>>>>>>>>>> existing
> >>>>>>>>>>> work won't have to be repeated. Plus, if there are any
> >> processors
> >>>>>>> with
> >>>>>>>>> side
> >>>>>>>>>>> effects, users won't have to tolerate weird edge cases where a
> >>>>> pause
> >>>>>>>>> occurs
> >>>>>>>>>>> after a processor sees a record, but before the result is sent
> >> to
> >>>>>>> its
> >>>>>>>>>>> outputs.
> >>>>>>>>>>>
> >>>>>>>>>>> 5. I noticed that you proposed not to add a PAUSED state, but I
> >>>>>>> didn't
> >>>>>>>>>>> follow
> >>>>>>>>>>> the rationale. Adding a state seems beneficial for a number of
> >>>>>>>> reasons:
> >>>>>>>>>>> StreamThreads already use the thread state to determine whether
> >>>>> to
> >>>>>>>>> process
> >>>>>>>>>>> or not, so avoiding a new State would just mean adding a
> >> separate
> >>>>>>> flag
> >>>>>>>>> to
> >>>>>>>>>>> track
> >>>>>>>>>>> and then checking your new flag in addition to the State in the
> >>>>>>>> thread.
> >>>>>>>>>>> Also,
> >>>>>>>>>>> operating Streams applications is a non-trivial task, and users
> >>>>> rely
> >>>>>>>> on
> >>>>>>>>>>> the State
> >>>>>>>>>>> (and transitions) to understand Streams's behavior. Adding a
> >>>>> PAUSED
> >>>>>>>>> state
> >>>>>>>>>>> is an elegant way to communicate to operators what is happening
> >>>>> with
> >>>>>>>> the
> >>>>>>>>>>> application. Note that the person digging though logs and
> >>>>> metrics,
> >>>>>>>>> trying
> >>>>>>>>>>> to understand why the application isn't doing anything is
> >>>>> probably
> >>>>>>> not
> >>>>>>>>>>> going
> >>>>>>>>>>> to be the same person who is calling pause() and resume().
> Also,
> >>>>> if
> >>>>>>>> you
> >>>>>>>>> add
> >>>>>>>>>>> a state, you don't need `isPaused()`.
> >>>>>>>>>>>
> >>>>>>>>>>> 5b. If you buy the arguments to go ahead and commit as well as
> >>>>> the
> >>>>>>>>>>> argument to add a State, then I'd also suggest to follow the
> >>>>>>> existing
> >>>>>>>>>>> patterns
> >>>>>>>>>>> for the shutdown states by also adding PAUSING. That
> >>>>>>>>>>> way, you'll also expose a way to understand that Streams
> >> received
> >>>>>>> the
> >>>>>>>>>>> signal
> >>>>>>>>>>> to pause, and that it's still processing and committing some
> >>>>> records
> >>>>>>>> in
> >>>>>>>>>>> preparation to enter a PAUSED state. I'm not sure if a RESUMING
> >>>>>>> state
> >>>>>>>>> would
> >>>>>>>>>>> also make sense.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I hit a tricky bit when thinking through having a PAUSED
> >>>>> state...  If
> >>>>>>>> one
> >>>>>>>>>> is using Named Topologies, and some of them are paused, what
> >>>>> state is
> >>>>>>>> the
> >>>>>>>>>> Streams instance in?  If we can agree on that, things may become
> >>>>>>>>> clear....
> >>>>>>>>>> I can see two quick ideas:
> >>>>>>>>>>
> >>>>>>>>>> 1.  The state is RUNNING and NamedTopologies have some other way
> >>>>> to
> >>>>>>>>>> indicate state.
> >>>>>>>>>>
> >>>>>>>>>> 2.  The state is something messy like PARTIALLY_PAUSED to
> reflect
> >>>>>>> that
> >>>>>>>>> the
> >>>>>>>>>> instance has something interesting going on.
> >>>>>>>>>>
> >>>>>>>>>> When I poked at things initially, I did try out having different
> >>>>>>>> states,
> >>>>>>>>>> and I readily agree that a PAUSING state may make sense.
> >>>>> (Especially
> >>>>>>>> if
> >>>>>>>>>> there's a need to run commits before transitioning all the way
> to
> >>>>>>>>> PAUSED.)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> And that's all I have to say about that. I hope you don't find
> >> my
> >>>>>>>>>>> long message offputting. I'm fundamentally in favor of your
> KIP,
> >>>>>>>>>>> and I think with a little more explanation in the KIP, and a
> few
> >>>>>>>>>>> small tweaks to the proposal, we'll be able to provide good
> >>>>>>>>>>> ergonomics to our users.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>>
> >>>>>>>>>> Jim
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> -John
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:
> >>>>>>>>>>>> I'm in favor of the "just pausing the instance itself“ option
> >>>>> as
> >>>>>>>>> well. As
> >>>>>>>>>>>> for EOS, the point is that when the processing is paused, we
> >>>>> would
> >>>>>>>> not
> >>>>>>>>>>>> trigger any `producer.send` during the time, and the
> >>>>> transaction
> >>>>>>>>> timeout
> >>>>>>>>>>> is
> >>>>>>>>>>>> sort of relying on that behavior, so my point was that it's
> >>>>>>> probably
> >>>>>>>>>>> better
> >>>>>>>>>>>> to also commit the processing before we pause it.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, May 6, 2022 at 6:12 PM Jim Hughes
> >>>>>>>>> <jhug...@confluent.io.invalid>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Since the only thing which will be paused is processing the
> >>>>>>>>> topology, I
> >>>>>>>>>>>>> think we can let commits happen naturally.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Good point about getting the paused state to new members; it
> >>>>> is
> >>>>>>>>> seeming
> >>>>>>>>>>>>> like the "building block" approach is a good one to keep
> >>>>> things
> >>>>>>>>> simple
> >>>>>>>>>>> at
> >>>>>>>>>>>>> first.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jim
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <
> >>>>> mj...@apache.org
> >>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think it's tricky to propagate a pauseAll() via the
> >>>>> rebalance
> >>>>>>>>>>>>>> protocol. New members joining the group would need to get
> >>>>>>> paused,
> >>>>>>>>> too?
> >>>>>>>>>>>>>> Could there be weird race conditions with overlapping
> >>>>>>> pauseAll()
> >>>>>>>>> and
> >>>>>>>>>>>>>> resumeAll() calls on different instanced while there could
> >>>>> be a
> >>>>>>>>>>> errors /
> >>>>>>>>>>>>>> network partitions or similar?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would argue that similar to IQ, we provide the basic
> >>>>> building
> >>>>>>>>>>> blocks,
> >>>>>>>>>>>>>> and leave it the user users to implement cross instance
> >>>>>>>> management
> >>>>>>>>>>> for a
> >>>>>>>>>>>>>> pauseAll() scenario. -- Also, if there is really demand, we
> >>>>> can
> >>>>>>>>> always
> >>>>>>>>>>>>>> add pauseAll()/resumeAll() as follow up work.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About named typologies: I agree to Jim to not include them
> >>>>> in
> >>>>>>>> this
> >>>>>>>>> KIP
> >>>>>>>>>>>>>> as they are not a public feature yet. If we make named
> >>>>>>> typologies
> >>>>>>>>>>>>>> public, the corresponding KIP should extend the pause/resume
> >>>>>>>>> feature
> >>>>>>>>>>>>>> (ie, APIs) accordingly. Of course, the code can (and should)
> >>>>>>>>> already
> >>>>>>>>>>> be
> >>>>>>>>>>>>>> setup to support it to be future proof.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Good call out about commit and EOS -- to simplify it, I
> >>>>> think
> >>>>>>> it
> >>>>>>>>> might
> >>>>>>>>>>>>>> be good to commit also for the at-least-once case?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 5/6/22 1:05 PM, Jim Hughes wrote:
> >>>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Great questions; I'll do my best to reply inline:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <
> >>>>>>> bbej...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jim,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP.  I have a couple of meta-questions as
> >>>>>>>> well:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) Regarding pausing only a subset of running instances,
> >>>>> I'm
> >>>>>>>>>>> thinking
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>> may be a use case for pausing all of them.
> >>>>>>>>>>>>>>>>       Would it make sense to also allow for pausing all
> >>>>>>>> instances
> >>>>>>>>> by
> >>>>>>>>>>>>>> adding a
> >>>>>>>>>>>>>>>> method `pauseAll()` or something similar?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Honestly, I'm indifferent on this point.  Presently, I
> >>>>> think
> >>>>>>>>> what I
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>>> proposed is the minimal change to get the ability to pause
> >>>>>>> and
> >>>>>>>>>>> resume
> >>>>>>>>>>>>>>> processing.  If adding a 'pauseAll()' is required, I'd be
> >>>>>>> happy
> >>>>>>>>> to
> >>>>>>>>>>> do
> >>>>>>>>>>>>>> that!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>    From Guozhang's email, it sounds like this would require
> >>>>>>> using
> >>>>>>>>> the
> >>>>>>>>>>>>>>> rebalance protocol to trigger the coordination.  Would
> >>>>> there
> >>>>>>> be
> >>>>>>>>>>> enough
> >>>>>>>>>>>>>> room
> >>>>>>>>>>>>>>> in that approach to indicate that a named topology is to
> >>>>> be
> >>>>>>>>> paused
> >>>>>>>>>>>>> across
> >>>>>>>>>>>>>>> all nodes?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2) Would pausing affect standby tasks?  For example,
> >>>>> imagine
> >>>>>>>>> there
> >>>>>>>>>>>>> are 3
> >>>>>>>>>>>>>>>> instances A, B, and C.
> >>>>>>>>>>>>>>>>       A user elects to pause instance C only but it hosts
> >>>>> the
> >>>>>>>>> standby
> >>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>> for A.
> >>>>>>>>>>>>>>>>       Would the standby tasks on the paused application
> >>>>>>> continue
> >>>>>>>>> to
> >>>>>>>>>>> read
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> the changelog topic?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Yes, standby tasks would continue reading from the
> >>>>> changelog
> >>>>>>>>> topic.
> >>>>>>>>>>>>> All
> >>>>>>>>>>>>>>> consumers would continue reading to avoid getting dropped
> >>>>>>> from
> >>>>>>>>> their
> >>>>>>>>>>>>>>> consumer groups.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jim
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 2:44 PM Jim Hughes
> >>>>>>>>>>>>> <jhug...@confluent.io.invalid
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the feedback; responses inline below:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <
> >>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hello Jim,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for the proposed KIP. I have some meta questions
> >>>>>>>> about
> >>>>>>>>> it:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1) Would an instance always pause/resume all of its
> >>>>>>> current
> >>>>>>>>> owned
> >>>>>>>>>>>>>>>>>> topologies (i.e. the named topologies), or are there
> >>>>> any
> >>>>>>>>>>> scenarios
> >>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> only want to pause/resume a subset of them?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> An instance may wish to pause some of its named
> >>>>> topologies.
> >>>>>>>> I
> >>>>>>>>> was
> >>>>>>>>>>>>>> unsure
> >>>>>>>>>>>>>>>>> what to say about named topologies in the KIP since they
> >>>>>>> seem
> >>>>>>>>> to
> >>>>>>>>>>> be
> >>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> internal detail at the moment.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I intend to add to KafkaStreamsNamedTopologyWrapper
> >>>>> methods
> >>>>>>>>> like:
> >>>>>>>>>>>>>>>>>        public void pauseNamedTopology(final String
> >>>>>>>>> topologyToPause)
> >>>>>>>>>>>>>>>>>        public boolean isNamedTopologyPaused(final String
> >>>>>>>>> topology)
> >>>>>>>>>>>>>>>>>        public void resumeNamedTopology(final String
> >>>>>>>>>>> topologyToResume)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2) From a user's perspective, do we want to always
> >>>>> issue a
> >>>>>>>>>>>>>>>> `pause/resume`
> >>>>>>>>>>>>>>>>>> to all the instances or not? For example, we can define
> >>>>>>> the
> >>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the function as "you only need to call this function on
> >>>>>>> any
> >>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> application's instances, and all instances would then
> >>>>>>> pause
> >>>>>>>>> (via
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> rebalance error codes)", or as "you would call this
> >>>>>>> function
> >>>>>>>>> for
> >>>>>>>>>>> all
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> instances of an application". Which one are you
> >>>>> referring
> >>>>>>>> to?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> My initial intent is that one would call this function
> >>>>> on
> >>>>>>> any
> >>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the application that one wishes to pause.  This should
> >>>>>>> allow
> >>>>>>>>> more
> >>>>>>>>>>>>>> control
> >>>>>>>>>>>>>>>>> (in case one wanted to pause a portion of the
> >>>>> instances).
> >>>>>>> On
> >>>>>>>>> the
> >>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>> hand, this approach would put more work on the
> >>>>> implementer
> >>>>>>> to
> >>>>>>>>>>>>>> coordinate
> >>>>>>>>>>>>>>>>> calling pause or resume across instances.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If the other option is more suitable, happy to do that
> >>>>>>>> instead.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3) With EOS, there's a transaction timeout which would
> >>>>>>>>> determine
> >>>>>>>>>>> how
> >>>>>>>>>>>>>>>>> long a
> >>>>>>>>>>>>>>>>>> transaction can stay idle before it's force-aborted on
> >>>>> the
> >>>>>>>>> broker
> >>>>>>>>>>>>>>>> side. I
> >>>>>>>>>>>>>>>>>> think when a pause is issued, that means we'd need to
> >>>>>>>>> immediately
> >>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>> the current transaction for EOS since we do not know
> >>>>> how
> >>>>>>>> long
> >>>>>>>>> we
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>> pause for. Is that right? If yes could you please
> >>>>> clarify
> >>>>>>>>> that in
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> doc
> >>>>>>>>>>>>>>>>>> as well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Good point.  My intent is for pause() to wait for the
> >>>>> next
> >>>>>>>>>>> iteration
> >>>>>>>>>>>>>>>>> through `runOnce()` and then only skip over the
> >>>>> processing
> >>>>>>>> for
> >>>>>>>>>>> paused
> >>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>> in `taskManager.process(numIterations, time)`.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Do commits live inside that call or do they live
> >>>>>>>>> across/outside of
> >>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>> the former case, I think there shouldn't be any issues
> >>>>> with
> >>>>>>>>> EOS.
> >>>>>>>>>>>>>>>>> Otherwise, we may need to work through some details to
> >>>>> get
> >>>>>>>> EOS
> >>>>>>>>>>> right.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Once we figure that out, I can update the KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jim
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes
> >>>>>>>>>>>>>>>> <jhug...@confluent.io.invalid
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I have written up a KIP for adding the ability to
> >>>>> pause
> >>>>>>> and
> >>>>>>>>>>> resume
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> processing of a topology in AK Streams.  The KIP is
> >>>>> here:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks in advance for your feedback!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jim
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to