I'm in favor of the "just pausing the instance itself“ option as well. As
for EOS, the point is that when the processing is paused, we would not
trigger any `producer.send` during the time, and the transaction timeout is
sort of relying on that behavior, so my point was that it's probably better
to also commit the processing before we pause it.


Guozhang

On Fri, May 6, 2022 at 6:12 PM Jim Hughes <jhug...@confluent.io.invalid>
wrote:

> Hi Matthias,
>
> Since the only thing which will be paused is processing the topology, I
> think we can let commits happen naturally.
>
> Good point about getting the paused state to new members; it is seeming
> like the "building block" approach is a good one to keep things simple at
> first.
>
> Cheers,
>
> Jim
>
> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > I think it's tricky to propagate a pauseAll() via the rebalance
> > protocol. New members joining the group would need to get paused, too?
> > Could there be weird race conditions with overlapping pauseAll() and
> > resumeAll() calls on different instanced while there could be a errors /
> > network partitions or similar?
> >
> > I would argue that similar to IQ, we provide the basic building blocks,
> > and leave it the user users to implement cross instance management for a
> > pauseAll() scenario. -- Also, if there is really demand, we can always
> > add pauseAll()/resumeAll() as follow up work.
> >
> > About named typologies: I agree to Jim to not include them in this KIP
> > as they are not a public feature yet. If we make named typologies
> > public, the corresponding KIP should extend the pause/resume feature
> > (ie, APIs) accordingly. Of course, the code can (and should) already be
> > setup to support it to be future proof.
> >
> > Good call out about commit and EOS -- to simplify it, I think it might
> > be good to commit also for the at-least-once case?
> >
> >
> > -Matthias
> >
> >
> > On 5/6/22 1:05 PM, Jim Hughes wrote:
> > > Hi Bill,
> > >
> > > Great questions; I'll do my best to reply inline:
> > >
> > > On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <bbej...@gmail.com> wrote:
> > >
> > >> Hi Jim,
> > >>
> > >> Thanks for the KIP.  I have a couple of meta-questions as well:
> > >>
> > >> 1) Regarding pausing only a subset of running instances, I'm thinking
> > there
> > >> may be a use case for pausing all of them.
> > >>     Would it make sense to also allow for pausing all instances by
> > adding a
> > >> method `pauseAll()` or something similar?
> > >>
> > >
> > > Honestly, I'm indifferent on this point.  Presently, I think what I
> have
> > > proposed is the minimal change to get the ability to pause and resume
> > > processing.  If adding a 'pauseAll()' is required, I'd be happy to do
> > that!
> > >
> > >  From Guozhang's email, it sounds like this would require using the
> > > rebalance protocol to trigger the coordination.  Would there be enough
> > room
> > > in that approach to indicate that a named topology is to be paused
> across
> > > all nodes?
> > >
> > >
> > >> 2) Would pausing affect standby tasks?  For example, imagine there
> are 3
> > >> instances A, B, and C.
> > >>     A user elects to pause instance C only but it hosts the standby
> > tasks
> > >> for A.
> > >>     Would the standby tasks on the paused application continue to read
> > from
> > >> the changelog topic?
> > >>
> > >
> > > Yes, standby tasks would continue reading from the changelog topic.
> All
> > > consumers would continue reading to avoid getting dropped from their
> > > consumer groups.
> > >
> > > Cheers,
> > >
> > > Jim
> > >
> > >
> > >
> > >
> > >> Thanks!
> > >> Bill
> > >>
> > >>
> > >> On Fri, May 6, 2022 at 2:44 PM Jim Hughes
> <jhug...@confluent.io.invalid
> > >
> > >> wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> Thanks for the feedback; responses inline below:
> > >>>
> > >>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >>>
> > >>>> Hello Jim,
> > >>>>
> > >>>> Thanks for the proposed KIP. I have some meta questions about it:
> > >>>>
> > >>>> 1) Would an instance always pause/resume all of its current owned
> > >>>> topologies (i.e. the named topologies), or are there any scenarios
> > >> where
> > >>> we
> > >>>> only want to pause/resume a subset of them?
> > >>>>
> > >>>
> > >>> An instance may wish to pause some of its named topologies.  I was
> > unsure
> > >>> what to say about named topologies in the KIP since they seem to be
> an
> > >>> internal detail at the moment.
> > >>>
> > >>> I intend to add to KafkaStreamsNamedTopologyWrapper methods like:
> > >>>      public void pauseNamedTopology(final String topologyToPause)
> > >>>      public boolean isNamedTopologyPaused(final String topology)
> > >>>      public void resumeNamedTopology(final String topologyToResume)
> > >>>
> > >>>
> > >>>
> > >>>> 2) From a user's perspective, do we want to always issue a
> > >> `pause/resume`
> > >>>> to all the instances or not? For example, we can define the
> semantics
> > >> of
> > >>>> the function as "you only need to call this function on any of the
> > >>>> application's instances, and all instances would then pause (via the
> > >>>> rebalance error codes)", or as "you would call this function for all
> > >> the
> > >>>> instances of an application". Which one are you referring to?
> > >>>>
> > >>>
> > >>> My initial intent is that one would call this function on any
> instances
> > >> of
> > >>> the application that one wishes to pause.  This should allow more
> > control
> > >>> (in case one wanted to pause a portion of the instances).  On the
> other
> > >>> hand, this approach would put more work on the implementer to
> > coordinate
> > >>> calling pause or resume across instances.
> > >>>
> > >>> If the other option is more suitable, happy to do that instead.
> > >>>
> > >>>
> > >>>> 3) With EOS, there's a transaction timeout which would determine how
> > >>> long a
> > >>>> transaction can stay idle before it's force-aborted on the broker
> > >> side. I
> > >>>> think when a pause is issued, that means we'd need to immediately
> > >> commit
> > >>>> the current transaction for EOS since we do not know how long we
> could
> > >>>> pause for. Is that right? If yes could you please clarify that in
> the
> > >> doc
> > >>>> as well.
> > >>>>
> > >>>
> > >>> Good point.  My intent is for pause() to wait for the next iteration
> > >>> through `runOnce()` and then only skip over the processing for paused
> > >> tasks
> > >>> in `taskManager.process(numIterations, time)`.
> > >>>
> > >>> Do commits live inside that call or do they live across/outside of
> it?
> > >> In
> > >>> the former case, I think there shouldn't be any issues with EOS.
> > >>> Otherwise, we may need to work through some details to get EOS right.
> > >>>
> > >>> Once we figure that out, I can update the KIP.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jim
> > >>>
> > >>>
> > >>>
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes
> > >> <jhug...@confluent.io.invalid
> > >>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>> I have written up a KIP for adding the ability to pause and resume
> > >> the
> > >>>>> processing of a topology in AK Streams.  The KIP is here:
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> > >>>>>
> > >>>>> Thanks in advance for your feedback!
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Jim
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >
> >
>


-- 
-- Guozhang

Reply via email to