I think for named topology we can leave the scope of this KIP as "all or
nothing", i.e. when you pause an instance you pause all of its topologies.
I raised this question in my previous email just trying to clarify if this
is what you have in mind. We can leave the question of finer controlled
pausing behavior for later when we have named topology being exposed via
another KIP.


Guozhang

On Mon, May 9, 2022 at 7:50 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Jim,
>
> Thanks for the replies. This all sounds good to me. Just two further
> comments:
>
> 3. It seems like you should aim for the simplest semantics. If the intent
> is to “pause” the instance, then you’d better pause the whole instance. If
> you leave punctuations and standbys running, I expect we’d see bug reports
> come in that the instance isn’t really paused.
>
> 5. Since you won the race to write a KIP, I don’t think it makes too much
> sense to worry too much about modular topologies. When they propose their
> KIP, they will have to specify a lot of state management behavior, and
> pause/resume will have to be part of it. If they have some concern about
> your KIP, they’ll chime in. It doesn’t make sense for you to try and guess
> what that proposal will look like.
>
> To be honest, you’re proposing a KafkaStreams runtime-level pause/resume
> function, not a topology-level one anyway, so it seems pretty clear that it
> would pause the whole runtime (of a single instance) regardless of any
> modular topologies. If the intent is to pause individual topologies in the
> future, you’d need a different API anyway.
>
> Thanks!
> -John
>
> On Mon, May 9, 2022, at 08:10, Jim Hughes wrote:
> > Hi John,
> >
> > Long emails are great; responding inline!
> >
> > On Sat, May 7, 2022 at 4:54 PM John Roesler <vvcep...@apache.org> wrote:
> >
> >> Thanks for the KIP, Jim!
> >>
> >> This conversation seems to highlight that the KIP needs to specify
> >> some of its behavior as well as its APIs, where the behavior is
> >> observable and significant to users.
> >>
> >> For example:
> >>
> >> 1. Do you plan to have a guarantee that immediately after
> >> calling KafkaStreams.pause(), users should observe that the instance
> >> stops processing new records? Or should they expect that the threads
> >> will continue to process some records and pause asynchronously
> >> (you already answered this in the thread earlier)?
> >>
> >
> > I'm happy to build up to a guarantee of sorts.  My current idea is that
> > pause() does not do anything "exceptional" to get control back from a
> > running topology.  A currently running topology would get to complete its
> > loop.
> >
> > Separately, I'm still piecing together how commits work.  By some
> > mechanism, after a pause, I do agree that the topology needs to commit
> its
> > work in some manner.
> >
> >
> >> 2. Will the threads continue to poll new records until they naturally
> fill
> >> up the task buffers, or will they immediately pause their Consumers
> >> as well?
> >>
> >
> > Presently, I'm suggesting that consumers would fill up their buffers.
> >
> >
> >> 3. Will threads continue to call (system time) punctuators, or would
> >> punctuations also be paused?
> >>
> >
> > In my first pass at thinking through this, I left the punctuators
> running.
> > To be honest, I'm not sure what they do, so my approach is either lucky
> and
> > correct or it could be Very Clearly Wrong.;)
> >
> >
> >> I realize that some of those questions simply may not have occurred to
> >> you, so this is not a criticism for leaving them off; I'm just pointing
> out
> >> that although we don't tend to mention implementation details in KIPs,
> >> we also can't be too high level, since there are a lot of operational
> >> details that users rely on to achieve various behaviors in Streams.
> >>
> >
> > Ayup, I will add some details as we iron out the guarantees,
> implementation
> > details that are at the API level.  This one is tough since internal
> > features like NamedTopologies are part of the discussion.
> >
> >
> >
> >> A couple more comments:
> >>
> >> 4. +1 to what Guozhang said. It seems like we should we also do a commit
> >> before entering the paused state. That way, any open transactions would
> >> be closed and not have to worry about timing out. Even under ALOS, it
> >> seems best to go ahead and complete the processing of in-flight records
> >> by committing. That way, if anything happens to die while it's paused,
> >> existing
> >> work won't have to be repeated. Plus, if there are any processors with
> side
> >> effects, users won't have to tolerate weird edge cases where a pause
> occurs
> >> after a processor sees a record, but before the result is sent to its
> >> outputs.
> >>
> >> 5. I noticed that you proposed not to add a PAUSED state, but I didn't
> >> follow
> >> the rationale. Adding a state seems beneficial for a number of reasons:
> >> StreamThreads already use the thread state to determine whether to
> process
> >> or not, so avoiding a new State would just mean adding a separate flag
> to
> >> track
> >> and then checking your new flag in addition to the State in the thread.
> >> Also,
> >> operating Streams applications is a non-trivial task, and users rely on
> >> the State
> >> (and transitions) to understand Streams's behavior. Adding a PAUSED
> state
> >> is an elegant way to communicate to operators what is happening with the
> >> application. Note that the person digging though logs and metrics,
> trying
> >> to understand why the application isn't doing anything is probably not
> >> going
> >> to be the same person who is calling pause() and resume(). Also, if you
> add
> >> a state, you don't need `isPaused()`.
> >>
> >> 5b. If you buy the arguments to go ahead and commit as well as the
> >> argument to add a State, then I'd also suggest to follow the existing
> >> patterns
> >> for the shutdown states by also adding PAUSING. That
> >> way, you'll also expose a way to understand that Streams received the
> >> signal
> >> to pause, and that it's still processing and committing some records in
> >> preparation to enter a PAUSED state. I'm not sure if a RESUMING state
> would
> >> also make sense.
> >>
> >
> > I hit a tricky bit when thinking through having a PAUSED state...  If one
> > is using Named Topologies, and some of them are paused, what state is the
> > Streams instance in?  If we can agree on that, things may become
> clear....
> > I can see two quick ideas:
> >
> > 1.  The state is RUNNING and NamedTopologies have some other way to
> > indicate state.
> >
> > 2.  The state is something messy like PARTIALLY_PAUSED to reflect that
> the
> > instance has something interesting going on.
> >
> > When I poked at things initially, I did try out having different states,
> > and I readily agree that a PAUSING state may make sense.  (Especially if
> > there's a need to run commits before transitioning all the way to
> PAUSED.)
> >
> >
> >
> >> And that's all I have to say about that. I hope you don't find my
> >> long message offputting. I'm fundamentally in favor of your KIP,
> >> and I think with a little more explanation in the KIP, and a few
> >> small tweaks to the proposal, we'll be able to provide good
> >> ergonomics to our users.
> >>
> >
> > Thanks!
> >
> > Jim
> >
> >
> >> Thanks,
> >> -John
> >>
> >> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:
> >> > I'm in favor of the "just pausing the instance itself“ option as
> well. As
> >> > for EOS, the point is that when the processing is paused, we would not
> >> > trigger any `producer.send` during the time, and the transaction
> timeout
> >> is
> >> > sort of relying on that behavior, so my point was that it's probably
> >> better
> >> > to also commit the processing before we pause it.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Fri, May 6, 2022 at 6:12 PM Jim Hughes
> <jhug...@confluent.io.invalid>
> >> > wrote:
> >> >
> >> >> Hi Matthias,
> >> >>
> >> >> Since the only thing which will be paused is processing the
> topology, I
> >> >> think we can let commits happen naturally.
> >> >>
> >> >> Good point about getting the paused state to new members; it is
> seeming
> >> >> like the "building block" approach is a good one to keep things
> simple
> >> at
> >> >> first.
> >> >>
> >> >> Cheers,
> >> >>
> >> >> Jim
> >> >>
> >> >> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >> >>
> >> >> > I think it's tricky to propagate a pauseAll() via the rebalance
> >> >> > protocol. New members joining the group would need to get paused,
> too?
> >> >> > Could there be weird race conditions with overlapping pauseAll()
> and
> >> >> > resumeAll() calls on different instanced while there could be a
> >> errors /
> >> >> > network partitions or similar?
> >> >> >
> >> >> > I would argue that similar to IQ, we provide the basic building
> >> blocks,
> >> >> > and leave it the user users to implement cross instance management
> >> for a
> >> >> > pauseAll() scenario. -- Also, if there is really demand, we can
> always
> >> >> > add pauseAll()/resumeAll() as follow up work.
> >> >> >
> >> >> > About named typologies: I agree to Jim to not include them in this
> KIP
> >> >> > as they are not a public feature yet. If we make named typologies
> >> >> > public, the corresponding KIP should extend the pause/resume
> feature
> >> >> > (ie, APIs) accordingly. Of course, the code can (and should)
> already
> >> be
> >> >> > setup to support it to be future proof.
> >> >> >
> >> >> > Good call out about commit and EOS -- to simplify it, I think it
> might
> >> >> > be good to commit also for the at-least-once case?
> >> >> >
> >> >> >
> >> >> > -Matthias
> >> >> >
> >> >> >
> >> >> > On 5/6/22 1:05 PM, Jim Hughes wrote:
> >> >> > > Hi Bill,
> >> >> > >
> >> >> > > Great questions; I'll do my best to reply inline:
> >> >> > >
> >> >> > > On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <bbej...@gmail.com>
> >> wrote:
> >> >> > >
> >> >> > >> Hi Jim,
> >> >> > >>
> >> >> > >> Thanks for the KIP.  I have a couple of meta-questions as well:
> >> >> > >>
> >> >> > >> 1) Regarding pausing only a subset of running instances, I'm
> >> thinking
> >> >> > there
> >> >> > >> may be a use case for pausing all of them.
> >> >> > >>     Would it make sense to also allow for pausing all instances
> by
> >> >> > adding a
> >> >> > >> method `pauseAll()` or something similar?
> >> >> > >>
> >> >> > >
> >> >> > > Honestly, I'm indifferent on this point.  Presently, I think
> what I
> >> >> have
> >> >> > > proposed is the minimal change to get the ability to pause and
> >> resume
> >> >> > > processing.  If adding a 'pauseAll()' is required, I'd be happy
> to
> >> do
> >> >> > that!
> >> >> > >
> >> >> > >  From Guozhang's email, it sounds like this would require using
> the
> >> >> > > rebalance protocol to trigger the coordination.  Would there be
> >> enough
> >> >> > room
> >> >> > > in that approach to indicate that a named topology is to be
> paused
> >> >> across
> >> >> > > all nodes?
> >> >> > >
> >> >> > >
> >> >> > >> 2) Would pausing affect standby tasks?  For example, imagine
> there
> >> >> are 3
> >> >> > >> instances A, B, and C.
> >> >> > >>     A user elects to pause instance C only but it hosts the
> standby
> >> >> > tasks
> >> >> > >> for A.
> >> >> > >>     Would the standby tasks on the paused application continue
> to
> >> read
> >> >> > from
> >> >> > >> the changelog topic?
> >> >> > >>
> >> >> > >
> >> >> > > Yes, standby tasks would continue reading from the changelog
> topic.
> >> >> All
> >> >> > > consumers would continue reading to avoid getting dropped from
> their
> >> >> > > consumer groups.
> >> >> > >
> >> >> > > Cheers,
> >> >> > >
> >> >> > > Jim
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > >> Thanks!
> >> >> > >> Bill
> >> >> > >>
> >> >> > >>
> >> >> > >> On Fri, May 6, 2022 at 2:44 PM Jim Hughes
> >> >> <jhug...@confluent.io.invalid
> >> >> > >
> >> >> > >> wrote:
> >> >> > >>
> >> >> > >>> Hi Guozhang,
> >> >> > >>>
> >> >> > >>> Thanks for the feedback; responses inline below:
> >> >> > >>>
> >> >> > >>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <
> wangg...@gmail.com>
> >> >> > wrote:
> >> >> > >>>
> >> >> > >>>> Hello Jim,
> >> >> > >>>>
> >> >> > >>>> Thanks for the proposed KIP. I have some meta questions about
> it:
> >> >> > >>>>
> >> >> > >>>> 1) Would an instance always pause/resume all of its current
> owned
> >> >> > >>>> topologies (i.e. the named topologies), or are there any
> >> scenarios
> >> >> > >> where
> >> >> > >>> we
> >> >> > >>>> only want to pause/resume a subset of them?
> >> >> > >>>>
> >> >> > >>>
> >> >> > >>> An instance may wish to pause some of its named topologies.  I
> was
> >> >> > unsure
> >> >> > >>> what to say about named topologies in the KIP since they seem
> to
> >> be
> >> >> an
> >> >> > >>> internal detail at the moment.
> >> >> > >>>
> >> >> > >>> I intend to add to KafkaStreamsNamedTopologyWrapper methods
> like:
> >> >> > >>>      public void pauseNamedTopology(final String
> topologyToPause)
> >> >> > >>>      public boolean isNamedTopologyPaused(final String
> topology)
> >> >> > >>>      public void resumeNamedTopology(final String
> >> topologyToResume)
> >> >> > >>>
> >> >> > >>>
> >> >> > >>>
> >> >> > >>>> 2) From a user's perspective, do we want to always issue a
> >> >> > >> `pause/resume`
> >> >> > >>>> to all the instances or not? For example, we can define the
> >> >> semantics
> >> >> > >> of
> >> >> > >>>> the function as "you only need to call this function on any of
> >> the
> >> >> > >>>> application's instances, and all instances would then pause
> (via
> >> the
> >> >> > >>>> rebalance error codes)", or as "you would call this function
> for
> >> all
> >> >> > >> the
> >> >> > >>>> instances of an application". Which one are you referring to?
> >> >> > >>>>
> >> >> > >>>
> >> >> > >>> My initial intent is that one would call this function on any
> >> >> instances
> >> >> > >> of
> >> >> > >>> the application that one wishes to pause.  This should allow
> more
> >> >> > control
> >> >> > >>> (in case one wanted to pause a portion of the instances).  On
> the
> >> >> other
> >> >> > >>> hand, this approach would put more work on the implementer to
> >> >> > coordinate
> >> >> > >>> calling pause or resume across instances.
> >> >> > >>>
> >> >> > >>> If the other option is more suitable, happy to do that instead.
> >> >> > >>>
> >> >> > >>>
> >> >> > >>>> 3) With EOS, there's a transaction timeout which would
> determine
> >> how
> >> >> > >>> long a
> >> >> > >>>> transaction can stay idle before it's force-aborted on the
> broker
> >> >> > >> side. I
> >> >> > >>>> think when a pause is issued, that means we'd need to
> immediately
> >> >> > >> commit
> >> >> > >>>> the current transaction for EOS since we do not know how long
> we
> >> >> could
> >> >> > >>>> pause for. Is that right? If yes could you please clarify
> that in
> >> >> the
> >> >> > >> doc
> >> >> > >>>> as well.
> >> >> > >>>>
> >> >> > >>>
> >> >> > >>> Good point.  My intent is for pause() to wait for the next
> >> iteration
> >> >> > >>> through `runOnce()` and then only skip over the processing for
> >> paused
> >> >> > >> tasks
> >> >> > >>> in `taskManager.process(numIterations, time)`.
> >> >> > >>>
> >> >> > >>> Do commits live inside that call or do they live
> across/outside of
> >> >> it?
> >> >> > >> In
> >> >> > >>> the former case, I think there shouldn't be any issues with
> EOS.
> >> >> > >>> Otherwise, we may need to work through some details to get EOS
> >> right.
> >> >> > >>>
> >> >> > >>> Once we figure that out, I can update the KIP.
> >> >> > >>>
> >> >> > >>> Thanks,
> >> >> > >>>
> >> >> > >>> Jim
> >> >> > >>>
> >> >> > >>>
> >> >> > >>>
> >> >> > >>>>
> >> >> > >>>>
> >> >> > >>>> Guozhang
> >> >> > >>>>
> >> >> > >>>>
> >> >> > >>>>
> >> >> > >>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes
> >> >> > >> <jhug...@confluent.io.invalid
> >> >> > >>>>
> >> >> > >>>> wrote:
> >> >> > >>>>
> >> >> > >>>>> Hi all,
> >> >> > >>>>>
> >> >> > >>>>> I have written up a KIP for adding the ability to pause and
> >> resume
> >> >> > >> the
> >> >> > >>>>> processing of a topology in AK Streams.  The KIP is here:
> >> >> > >>>>>
> >> >> > >>>>
> >> >> > >>>
> >> >> > >>
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> >> >> > >>>>>
> >> >> > >>>>> Thanks in advance for your feedback!
> >> >> > >>>>>
> >> >> > >>>>> Cheers,
> >> >> > >>>>>
> >> >> > >>>>> Jim
> >> >> > >>>>>
> >> >> > >>>>
> >> >> > >>>>
> >> >> > >>>> --
> >> >> > >>>> -- Guozhang
> >> >> > >>>>
> >> >> > >>>
> >> >> > >>
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
>


-- 
-- Guozhang

Reply via email to