Hi Matthias, I like it. I've updated the KIP to reflect that detail; I put the details in the docs for pause.
Cheers, Jim On Tue, May 10, 2022 at 7:51 PM Matthias J. Sax <mj...@apache.org> wrote: > Thanks for the KIP. Overall LGTM. > > Can we clarify one question: would it be allowed to call `pause()` > before calling `start()`? I don't see any reason why we would need to > disallow it? > > It could be helpful to start a KafkaStreams client in paused state -- > otherwise there is a race between calling `start()` and calling `pause()`. > > If we allow it, we should clearly document it. > > > -Matthias > > On 5/10/22 12:04 PM, Jim Hughes wrote: > > Hi Bill, all, > > > > Thank you. I've updated the KIP to reflect pausing standby tasks as > well. > > I think all the outstanding points have been addressed and I'm going to > > start the vote thread! > > > > Cheers, > > > > Jim > > > > > > > > On Tue, May 10, 2022 at 2:43 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > >> Hi Jim, > >> > >> After reading the comments on the KIP, I agree that it makes sense to > pause > >> all activities and any changes can be made later on. > >> > >> Thanks, > >> Bill > >> > >> On Tue, May 10, 2022 at 4:03 AM Bruno Cadonna <cado...@apache.org> > wrote: > >> > >>> Hi Jim, > >>> > >>> Thanks for the KIP! > >>> > >>> I am fine with the KIP in general. > >>> > >>> However, I am with Sophie and John to also pause the standbys for the > >>> reasons they brought up. Is there a specific reason you want to keep > >>> standbys going? It feels like premature optimization to me. We can > still > >>> add keeping standby running in a follow up if needed. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 10.05.22 05:15, Sophie Blee-Goldman wrote: > >>>> Thanks Jim, just one note/question on the standby tasks: > >>>> > >>>> At the minute, my moderately held position is that standby tasks ought > >> to > >>>>> continue reading and remain caught up. If standby tasks would run > out > >>> of > >>>>> space, there are probably bigger problems. > >>>> > >>>> > >>>> For a single node application, or when the #pause API is invoked on > all > >>>> instances, > >>>> then there won't be any further active processing and thus nothing to > >>> keep > >>>> up with, > >>>> right? So for that case, it's just a matter of whether any standbys > >> that > >>>> are lagging > >>>> will have the chance to catch up to the (paused) active task state > >> before > >>>> they stop > >>>> as well, in which case having them continue feels fine to me. However > >>> this > >>>> is a > >>>> relatively trivial benefit and I would only consider it as a deciding > >>>> factor when all > >>>> things are equal otherwise. > >>>> > >>>> My concern is the more interesting case: when this feature is used to > >>> pause > >>>> only > >>>> one nodes, or some subset of the overall application. In this case, > >> yes, > >>>> the standby > >>>> tasks will indeed fall out of sync. But the only reason I can imagine > >>>> someone using > >>>> the pause feature in such a way is because there is something going > >>> wrong, > >>>> or about > >>>> to go wrong, on that particular node. For example as mentioned above, > >> if > >>>> the user > >>>> wants to cut down on costs without stopping everything, or if the node > >> is > >>>> about to > >>>> run out of disk or needs to be debugged or so on. And in this case, > >>>> continuing to > >>>> process the standby tasks while other instances continue to run would > >>>> pretty much > >>>> defeat the purpose of pausing it entirely, and might have unpleasant > >>>> consequences > >>>> for the unsuspecting developer. > >>>> > >>>> All that said, I don't want to block this KIP so if you have strong > >>>> feelings about the > >>>> standby behavior I'm happy to back down. I'm only pushing back now > >>> because > >>>> it > >>>> felt like there wasn't any particular motivation for the standbys to > >>>> continue processing > >>>> or not, and I figured I'd try to fill in this gap with my thoughts on > >> the > >>>> matter :) > >>>> Either way we should just make sure that this behavior is documented > >>>> clearly, > >>>> since it may be surprising if we decide to only pause active > processing > >>>> (another option > >>>> is to rename the method something like #pauseProcessing or > >>>> #pauseActiveProcessing > >>>> so that it's hard to miss). > >>>> > >>>> Thanks! Sorry for the lengthy response, but hopefully we won't need to > >>>> debate this any > >>>> further. Beyond this I'm satisfied with the latest proposal > >>>> > >>>> On Mon, May 9, 2022 at 5:16 PM John Roesler <vvcep...@apache.org> > >> wrote: > >>>> > >>>>> Thanks for the updates, Jim! > >>>>> > >>>>> After this discussion and your updates, this KIP looks good to me. > >>>>> > >>>>> Thanks, > >>>>> John > >>>>> > >>>>> On Mon, May 9, 2022, at 17:52, Jim Hughes wrote: > >>>>>> Hi Sophie, all, > >>>>>> > >>>>>> I've updated the KIP with feedback from the discussion so far: > >>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 > >>>>>> > >>>>>> As a terse summary of my current position: > >>>>>> Pausing will only stop processing and punctuation (respecting > modular > >>>>>> topologies). > >>>>>> Paused topologies will still a) consume from input topics, b) call > >> the > >>>>>> usual commit pathways (commits will happen basically as they would > >>> have), > >>>>>> and c) standBy tasks will still be processed. > >>>>>> > >>>>>> Shout if the KIP or those details still need some TLC. Responding > to > >>>>>> Sophie inline below. > >>>>>> > >>>>>> > >>>>>> On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman > >>>>>> <sop...@confluent.io.invalid> wrote: > >>>>>> > >>>>>>> Don't worry, I'm going to be adding the APIs for topology-level > >>> pausing > >>>>> as > >>>>>>> part of the modular topologies KIP, > >>>>>>> so we don't need to worry about that for now. That said, I don't > >> think > >>>>> we > >>>>>>> should brush it off entirely and design > >>>>>>> this feature in a way that's going to be incompatible or hugely > >> raise > >>>>> the > >>>>>>> LOE on bringing the (mostly already > >>>>>>> implemented) modular topologies feature into the public API, just > >>>>>>> because it "won the race to write a KIP" :) > >>>>>>> > >>>>>> > >>>>>> Yes, I'm hoping that this is all compatible with modular > >> topologies. I > >>>>>> haven't seen anything so far which seems to be a problem; this KIP > is > >>>>> just > >>>>>> in a weird state to discuss details of acting on modular > >> topologies.:) > >>>>>> > >>>>>> > >>>>>>> I may be biased (ok, I definitely am), but I'm not in favor of > >> adding > >>>>> this > >>>>>>> as a state regardless of the modular topologies. > >>>>>>> First of all any change to the KafkaStreams state machine is a > >>> breaking > >>>>>>> change, no? So we would have to wait until > >>>>>>> the next major release which seems like an unnecessary thing to > >> block > >>>>> on. > >>>>>>> (Whether to add this as a state to the > >>>>>>> StreamThread's FSM is an implementation detail). > >>>>>>> > >>>>>> > >>>>>> +1. I am sold on skipping out on new states. I had that as a > >> rejected > >>>>>> alternative in the KIP and have added a few more words to that bit. > >>>>>> > >>>>>> > >>>>>>> Also, the semantics of using an `isPaused` method to distinguish a > >>>>> paused > >>>>>>> instance (or topology) make more sense > >>>>>>> to me -- this is a user-specified status, whereas the KafkaStreams > >>>>> state is > >>>>>>> intended to relay the status of the system > >>>>>>> itself. For example, if we are going to continue to poll during > >> pause, > >>>>> then > >>>>>>> shouldn't the client transition to REBALANCING? > >>>>>>> I believe it makes sense to still allow distinguishing these states > >>>>> while a > >>>>>>> client is paused, whereas making PAUSED its > >>>>>>> own state means you can't tell when the client is rebalancing vs > >>>>> running, > >>>>>>> or whether it is paused or dead: presumably > >>>>>>> the NOT_RUNNING/ERROR state would trump the PAUSED state, which > >> means > >>>>> you > >>>>>>> would not be able to rely on > >>>>>>> checking the state to see if you had called PAUSED on that > instance. > >>>>>>> Obviously you can work around this by just > >>>>>>> maintaining a flag in the usercode, but all this feels very > >> unnatural > >>>>> to me > >>>>>>> vs just checking the `#isPaused` API. > >>>>>>> > >>>>>>> On that note, I had one question -- at what point would the > >>> `#isPaused` > >>>>>>> check return true? Would it do so immediately > >>>>>>> after pausing the instance, or only once it has finished committing > >>>>> offsets > >>>>>>> and stopped returning records? > >>>>>>> > >>>>>> > >>>>>> Immediately, `#isPaused` tells you about metadata. > >>>>>> > >>>>>> > >>>>>>> Finally, on the note of punctuators I think it would make most > sense > >>> to > >>>>>>> either pause these as well or else add this an > >>>>>>> an explicit option for the user. If this feature is used to, for > >>>>> example, > >>>>>>> help save on processing costs while an app is > >>>>>>> not in use, then it would probably be surprising and perhaps > >> alarming > >>> to > >>>>>>> see certain kinds of processing still continue. > >>>>>>> > >>>>>> > >>>>>> From other parts of the discussion, I'm sold on pausing > punctuation. > >>>>>> > >>>>>> > >>>>>>> The question of whether to continue fetching for standby tasks is > >>> maybe > >>>>> a > >>>>>>> bit more debatable, as it would certainly be > >>>>>>> nice to find your clients all caught up when you go to resume the > >>>>> instance > >>>>>>> again, but I would still strongly suggest > >>>>>>> pausing these as well. To use a similar example, imagine if you > >> paused > >>>>> an > >>>>>>> app because it was about to run out of > >>>>>>> disk. If the standbys kept processing and filled up the remaining > >>> space, > >>>>>>> you'd probably feel a bit betrayed by this API. > >>>>>>> > >>>>>>> WDYT? > >>>>>>> > >>>>>> > >>>>>> At the minute, my moderately held position is that standby tasks > >> ought > >>> to > >>>>>> continue reading and remain caught up. If standby tasks would run > >> out > >>> of > >>>>>> space, there are probably bigger problems. > >>>>>> > >>>>>> If later it is desirable to manage punctuation or standby tasks, > then > >>> it > >>>>>> should be easy for future folks to modify things. > >>>>>> > >>>>>> Overall, I'd frame this KIP as "pause processing resulting in > >> outputs". > >>>>>> > >>>>>> Cheers, > >>>>>> > >>>>>> Jim > >>>>>> > >>>>>> > >>>>>> > >>>>>>> On Mon, May 9, 2022 at 10:33 AM Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> I think for named topology we can leave the scope of this KIP as > >> "all > >>>>> or > >>>>>>>> nothing", i.e. when you pause an instance you pause all of its > >>>>>>> topologies. > >>>>>>>> I raised this question in my previous email just trying to clarify > >> if > >>>>>>> this > >>>>>>>> is what you have in mind. We can leave the question of finer > >>>>> controlled > >>>>>>>> pausing behavior for later when we have named topology being > >> exposed > >>>>> via > >>>>>>>> another KIP. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> On Mon, May 9, 2022 at 7:50 AM John Roesler <vvcep...@apache.org> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Jim, > >>>>>>>>> > >>>>>>>>> Thanks for the replies. This all sounds good to me. Just two > >> further > >>>>>>>>> comments: > >>>>>>>>> > >>>>>>>>> 3. It seems like you should aim for the simplest semantics. If > the > >>>>>>> intent > >>>>>>>>> is to “pause” the instance, then you’d better pause the whole > >>>>> instance. > >>>>>>>> If > >>>>>>>>> you leave punctuations and standbys running, I expect we’d see > bug > >>>>>>>> reports > >>>>>>>>> come in that the instance isn’t really paused. > >>>>>>>>> > >>>>>>>>> 5. Since you won the race to write a KIP, I don’t think it makes > >> too > >>>>>>> much > >>>>>>>>> sense to worry too much about modular topologies. When they > >> propose > >>>>>>> their > >>>>>>>>> KIP, they will have to specify a lot of state management > behavior, > >>>>> and > >>>>>>>>> pause/resume will have to be part of it. If they have some > concern > >>>>>>> about > >>>>>>>>> your KIP, they’ll chime in. It doesn’t make sense for you to try > >> and > >>>>>>>> guess > >>>>>>>>> what that proposal will look like. > >>>>>>>>> > >>>>>>>>> To be honest, you’re proposing a KafkaStreams runtime-level > >>>>>>> pause/resume > >>>>>>>>> function, not a topology-level one anyway, so it seems pretty > >> clear > >>>>>>> that > >>>>>>>> it > >>>>>>>>> would pause the whole runtime (of a single instance) regardless > of > >>>>> any > >>>>>>>>> modular topologies. If the intent is to pause individual > >> topologies > >>>>> in > >>>>>>>> the > >>>>>>>>> future, you’d need a different API anyway. > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> -John > >>>>>>>>> > >>>>>>>>> On Mon, May 9, 2022, at 08:10, Jim Hughes wrote: > >>>>>>>>>> Hi John, > >>>>>>>>>> > >>>>>>>>>> Long emails are great; responding inline! > >>>>>>>>>> > >>>>>>>>>> On Sat, May 7, 2022 at 4:54 PM John Roesler < > vvcep...@apache.org > >>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks for the KIP, Jim! > >>>>>>>>>>> > >>>>>>>>>>> This conversation seems to highlight that the KIP needs to > >>>>> specify > >>>>>>>>>>> some of its behavior as well as its APIs, where the behavior is > >>>>>>>>>>> observable and significant to users. > >>>>>>>>>>> > >>>>>>>>>>> For example: > >>>>>>>>>>> > >>>>>>>>>>> 1. Do you plan to have a guarantee that immediately after > >>>>>>>>>>> calling KafkaStreams.pause(), users should observe that the > >>>>> instance > >>>>>>>>>>> stops processing new records? Or should they expect that the > >>>>> threads > >>>>>>>>>>> will continue to process some records and pause asynchronously > >>>>>>>>>>> (you already answered this in the thread earlier)? > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I'm happy to build up to a guarantee of sorts. My current idea > >> is > >>>>>>> that > >>>>>>>>>> pause() does not do anything "exceptional" to get control back > >>>>> from a > >>>>>>>>>> running topology. A currently running topology would get to > >>>>> complete > >>>>>>>> its > >>>>>>>>>> loop. > >>>>>>>>>> > >>>>>>>>>> Separately, I'm still piecing together how commits work. By > some > >>>>>>>>>> mechanism, after a pause, I do agree that the topology needs to > >>>>>>> commit > >>>>>>>>> its > >>>>>>>>>> work in some manner. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> 2. Will the threads continue to poll new records until they > >>>>>>> naturally > >>>>>>>>> fill > >>>>>>>>>>> up the task buffers, or will they immediately pause their > >>>>> Consumers > >>>>>>>>>>> as well? > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Presently, I'm suggesting that consumers would fill up their > >>>>> buffers. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> 3. Will threads continue to call (system time) punctuators, or > >>>>> would > >>>>>>>>>>> punctuations also be paused? > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> In my first pass at thinking through this, I left the > punctuators > >>>>>>>>> running. > >>>>>>>>>> To be honest, I'm not sure what they do, so my approach is > either > >>>>>>> lucky > >>>>>>>>> and > >>>>>>>>>> correct or it could be Very Clearly Wrong.;) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> I realize that some of those questions simply may not have > >>>>> occurred > >>>>>>> to > >>>>>>>>>>> you, so this is not a criticism for leaving them off; I'm just > >>>>>>>> pointing > >>>>>>>>> out > >>>>>>>>>>> that although we don't tend to mention implementation details > in > >>>>>>> KIPs, > >>>>>>>>>>> we also can't be too high level, since there are a lot of > >>>>>>> operational > >>>>>>>>>>> details that users rely on to achieve various behaviors in > >>>>> Streams. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Ayup, I will add some details as we iron out the guarantees, > >>>>>>>>> implementation > >>>>>>>>>> details that are at the API level. This one is tough since > >>>>> internal > >>>>>>>>>> features like NamedTopologies are part of the discussion. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> A couple more comments: > >>>>>>>>>>> > >>>>>>>>>>> 4. +1 to what Guozhang said. It seems like we should we also do > >> a > >>>>>>>> commit > >>>>>>>>>>> before entering the paused state. That way, any open > >> transactions > >>>>>>>> would > >>>>>>>>>>> be closed and not have to worry about timing out. Even under > >>>>> ALOS, > >>>>>>> it > >>>>>>>>>>> seems best to go ahead and complete the processing of in-flight > >>>>>>>> records > >>>>>>>>>>> by committing. That way, if anything happens to die while it's > >>>>>>> paused, > >>>>>>>>>>> existing > >>>>>>>>>>> work won't have to be repeated. Plus, if there are any > >> processors > >>>>>>> with > >>>>>>>>> side > >>>>>>>>>>> effects, users won't have to tolerate weird edge cases where a > >>>>> pause > >>>>>>>>> occurs > >>>>>>>>>>> after a processor sees a record, but before the result is sent > >> to > >>>>>>> its > >>>>>>>>>>> outputs. > >>>>>>>>>>> > >>>>>>>>>>> 5. I noticed that you proposed not to add a PAUSED state, but I > >>>>>>> didn't > >>>>>>>>>>> follow > >>>>>>>>>>> the rationale. Adding a state seems beneficial for a number of > >>>>>>>> reasons: > >>>>>>>>>>> StreamThreads already use the thread state to determine whether > >>>>> to > >>>>>>>>> process > >>>>>>>>>>> or not, so avoiding a new State would just mean adding a > >> separate > >>>>>>> flag > >>>>>>>>> to > >>>>>>>>>>> track > >>>>>>>>>>> and then checking your new flag in addition to the State in the > >>>>>>>> thread. > >>>>>>>>>>> Also, > >>>>>>>>>>> operating Streams applications is a non-trivial task, and users > >>>>> rely > >>>>>>>> on > >>>>>>>>>>> the State > >>>>>>>>>>> (and transitions) to understand Streams's behavior. Adding a > >>>>> PAUSED > >>>>>>>>> state > >>>>>>>>>>> is an elegant way to communicate to operators what is happening > >>>>> with > >>>>>>>> the > >>>>>>>>>>> application. Note that the person digging though logs and > >>>>> metrics, > >>>>>>>>> trying > >>>>>>>>>>> to understand why the application isn't doing anything is > >>>>> probably > >>>>>>> not > >>>>>>>>>>> going > >>>>>>>>>>> to be the same person who is calling pause() and resume(). > Also, > >>>>> if > >>>>>>>> you > >>>>>>>>> add > >>>>>>>>>>> a state, you don't need `isPaused()`. > >>>>>>>>>>> > >>>>>>>>>>> 5b. If you buy the arguments to go ahead and commit as well as > >>>>> the > >>>>>>>>>>> argument to add a State, then I'd also suggest to follow the > >>>>>>> existing > >>>>>>>>>>> patterns > >>>>>>>>>>> for the shutdown states by also adding PAUSING. That > >>>>>>>>>>> way, you'll also expose a way to understand that Streams > >> received > >>>>>>> the > >>>>>>>>>>> signal > >>>>>>>>>>> to pause, and that it's still processing and committing some > >>>>> records > >>>>>>>> in > >>>>>>>>>>> preparation to enter a PAUSED state. I'm not sure if a RESUMING > >>>>>>> state > >>>>>>>>> would > >>>>>>>>>>> also make sense. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I hit a tricky bit when thinking through having a PAUSED > >>>>> state... If > >>>>>>>> one > >>>>>>>>>> is using Named Topologies, and some of them are paused, what > >>>>> state is > >>>>>>>> the > >>>>>>>>>> Streams instance in? If we can agree on that, things may become > >>>>>>>>> clear.... > >>>>>>>>>> I can see two quick ideas: > >>>>>>>>>> > >>>>>>>>>> 1. The state is RUNNING and NamedTopologies have some other way > >>>>> to > >>>>>>>>>> indicate state. > >>>>>>>>>> > >>>>>>>>>> 2. The state is something messy like PARTIALLY_PAUSED to > reflect > >>>>>>> that > >>>>>>>>> the > >>>>>>>>>> instance has something interesting going on. > >>>>>>>>>> > >>>>>>>>>> When I poked at things initially, I did try out having different > >>>>>>>> states, > >>>>>>>>>> and I readily agree that a PAUSING state may make sense. > >>>>> (Especially > >>>>>>>> if > >>>>>>>>>> there's a need to run commits before transitioning all the way > to > >>>>>>>>> PAUSED.) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> And that's all I have to say about that. I hope you don't find > >> my > >>>>>>>>>>> long message offputting. I'm fundamentally in favor of your > KIP, > >>>>>>>>>>> and I think with a little more explanation in the KIP, and a > few > >>>>>>>>>>> small tweaks to the proposal, we'll be able to provide good > >>>>>>>>>>> ergonomics to our users. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks! > >>>>>>>>>> > >>>>>>>>>> Jim > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> -John > >>>>>>>>>>> > >>>>>>>>>>> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote: > >>>>>>>>>>>> I'm in favor of the "just pausing the instance itself“ option > >>>>> as > >>>>>>>>> well. As > >>>>>>>>>>>> for EOS, the point is that when the processing is paused, we > >>>>> would > >>>>>>>> not > >>>>>>>>>>>> trigger any `producer.send` during the time, and the > >>>>> transaction > >>>>>>>>> timeout > >>>>>>>>>>> is > >>>>>>>>>>>> sort of relying on that behavior, so my point was that it's > >>>>>>> probably > >>>>>>>>>>> better > >>>>>>>>>>>> to also commit the processing before we pause it. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Guozhang > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, May 6, 2022 at 6:12 PM Jim Hughes > >>>>>>>>> <jhug...@confluent.io.invalid> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Since the only thing which will be paused is processing the > >>>>>>>>> topology, I > >>>>>>>>>>>>> think we can let commits happen naturally. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Good point about getting the paused state to new members; it > >>>>> is > >>>>>>>>> seeming > >>>>>>>>>>>>> like the "building block" approach is a good one to keep > >>>>> things > >>>>>>>>> simple > >>>>>>>>>>> at > >>>>>>>>>>>>> first. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jim > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax < > >>>>> mj...@apache.org > >>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I think it's tricky to propagate a pauseAll() via the > >>>>> rebalance > >>>>>>>>>>>>>> protocol. New members joining the group would need to get > >>>>>>> paused, > >>>>>>>>> too? > >>>>>>>>>>>>>> Could there be weird race conditions with overlapping > >>>>>>> pauseAll() > >>>>>>>>> and > >>>>>>>>>>>>>> resumeAll() calls on different instanced while there could > >>>>> be a > >>>>>>>>>>> errors / > >>>>>>>>>>>>>> network partitions or similar? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I would argue that similar to IQ, we provide the basic > >>>>> building > >>>>>>>>>>> blocks, > >>>>>>>>>>>>>> and leave it the user users to implement cross instance > >>>>>>>> management > >>>>>>>>>>> for a > >>>>>>>>>>>>>> pauseAll() scenario. -- Also, if there is really demand, we > >>>>> can > >>>>>>>>> always > >>>>>>>>>>>>>> add pauseAll()/resumeAll() as follow up work. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> About named typologies: I agree to Jim to not include them > >>>>> in > >>>>>>>> this > >>>>>>>>> KIP > >>>>>>>>>>>>>> as they are not a public feature yet. If we make named > >>>>>>> typologies > >>>>>>>>>>>>>> public, the corresponding KIP should extend the pause/resume > >>>>>>>>> feature > >>>>>>>>>>>>>> (ie, APIs) accordingly. Of course, the code can (and should) > >>>>>>>>> already > >>>>>>>>>>> be > >>>>>>>>>>>>>> setup to support it to be future proof. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Good call out about commit and EOS -- to simplify it, I > >>>>> think > >>>>>>> it > >>>>>>>>> might > >>>>>>>>>>>>>> be good to commit also for the at-least-once case? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 5/6/22 1:05 PM, Jim Hughes wrote: > >>>>>>>>>>>>>>> Hi Bill, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Great questions; I'll do my best to reply inline: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, May 6, 2022 at 3:21 PM Bill Bejeck < > >>>>>>> bbej...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jim, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP. I have a couple of meta-questions as > >>>>>>>> well: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1) Regarding pausing only a subset of running instances, > >>>>> I'm > >>>>>>>>>>> thinking > >>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>> may be a use case for pausing all of them. > >>>>>>>>>>>>>>>> Would it make sense to also allow for pausing all > >>>>>>>> instances > >>>>>>>>> by > >>>>>>>>>>>>>> adding a > >>>>>>>>>>>>>>>> method `pauseAll()` or something similar? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Honestly, I'm indifferent on this point. Presently, I > >>>>> think > >>>>>>>>> what I > >>>>>>>>>>>>> have > >>>>>>>>>>>>>>> proposed is the minimal change to get the ability to pause > >>>>>>> and > >>>>>>>>>>> resume > >>>>>>>>>>>>>>> processing. If adding a 'pauseAll()' is required, I'd be > >>>>>>> happy > >>>>>>>>> to > >>>>>>>>>>> do > >>>>>>>>>>>>>> that! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> From Guozhang's email, it sounds like this would require > >>>>>>> using > >>>>>>>>> the > >>>>>>>>>>>>>>> rebalance protocol to trigger the coordination. Would > >>>>> there > >>>>>>> be > >>>>>>>>>>> enough > >>>>>>>>>>>>>> room > >>>>>>>>>>>>>>> in that approach to indicate that a named topology is to > >>>>> be > >>>>>>>>> paused > >>>>>>>>>>>>> across > >>>>>>>>>>>>>>> all nodes? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2) Would pausing affect standby tasks? For example, > >>>>> imagine > >>>>>>>>> there > >>>>>>>>>>>>> are 3 > >>>>>>>>>>>>>>>> instances A, B, and C. > >>>>>>>>>>>>>>>> A user elects to pause instance C only but it hosts > >>>>> the > >>>>>>>>> standby > >>>>>>>>>>>>>> tasks > >>>>>>>>>>>>>>>> for A. > >>>>>>>>>>>>>>>> Would the standby tasks on the paused application > >>>>>>> continue > >>>>>>>>> to > >>>>>>>>>>> read > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>> the changelog topic? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yes, standby tasks would continue reading from the > >>>>> changelog > >>>>>>>>> topic. > >>>>>>>>>>>>> All > >>>>>>>>>>>>>>> consumers would continue reading to avoid getting dropped > >>>>>>> from > >>>>>>>>> their > >>>>>>>>>>>>>>> consumer groups. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jim > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 2:44 PM Jim Hughes > >>>>>>>>>>>>> <jhug...@confluent.io.invalid > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Guozhang, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the feedback; responses inline below: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang < > >>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hello Jim, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the proposed KIP. I have some meta questions > >>>>>>>> about > >>>>>>>>> it: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1) Would an instance always pause/resume all of its > >>>>>>> current > >>>>>>>>> owned > >>>>>>>>>>>>>>>>>> topologies (i.e. the named topologies), or are there > >>>>> any > >>>>>>>>>>> scenarios > >>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> only want to pause/resume a subset of them? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> An instance may wish to pause some of its named > >>>>> topologies. > >>>>>>>> I > >>>>>>>>> was > >>>>>>>>>>>>>> unsure > >>>>>>>>>>>>>>>>> what to say about named topologies in the KIP since they > >>>>>>> seem > >>>>>>>>> to > >>>>>>>>>>> be > >>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>> internal detail at the moment. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I intend to add to KafkaStreamsNamedTopologyWrapper > >>>>> methods > >>>>>>>>> like: > >>>>>>>>>>>>>>>>> public void pauseNamedTopology(final String > >>>>>>>>> topologyToPause) > >>>>>>>>>>>>>>>>> public boolean isNamedTopologyPaused(final String > >>>>>>>>> topology) > >>>>>>>>>>>>>>>>> public void resumeNamedTopology(final String > >>>>>>>>>>> topologyToResume) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2) From a user's perspective, do we want to always > >>>>> issue a > >>>>>>>>>>>>>>>> `pause/resume` > >>>>>>>>>>>>>>>>>> to all the instances or not? For example, we can define > >>>>>>> the > >>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the function as "you only need to call this function on > >>>>>>> any > >>>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> application's instances, and all instances would then > >>>>>>> pause > >>>>>>>>> (via > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> rebalance error codes)", or as "you would call this > >>>>>>> function > >>>>>>>>> for > >>>>>>>>>>> all > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> instances of an application". Which one are you > >>>>> referring > >>>>>>>> to? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> My initial intent is that one would call this function > >>>>> on > >>>>>>> any > >>>>>>>>>>>>> instances > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the application that one wishes to pause. This should > >>>>>>> allow > >>>>>>>>> more > >>>>>>>>>>>>>> control > >>>>>>>>>>>>>>>>> (in case one wanted to pause a portion of the > >>>>> instances). > >>>>>>> On > >>>>>>>>> the > >>>>>>>>>>>>> other > >>>>>>>>>>>>>>>>> hand, this approach would put more work on the > >>>>> implementer > >>>>>>> to > >>>>>>>>>>>>>> coordinate > >>>>>>>>>>>>>>>>> calling pause or resume across instances. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If the other option is more suitable, happy to do that > >>>>>>>> instead. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3) With EOS, there's a transaction timeout which would > >>>>>>>>> determine > >>>>>>>>>>> how > >>>>>>>>>>>>>>>>> long a > >>>>>>>>>>>>>>>>>> transaction can stay idle before it's force-aborted on > >>>>> the > >>>>>>>>> broker > >>>>>>>>>>>>>>>> side. I > >>>>>>>>>>>>>>>>>> think when a pause is issued, that means we'd need to > >>>>>>>>> immediately > >>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>> the current transaction for EOS since we do not know > >>>>> how > >>>>>>>> long > >>>>>>>>> we > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>> pause for. Is that right? If yes could you please > >>>>> clarify > >>>>>>>>> that in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> doc > >>>>>>>>>>>>>>>>>> as well. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Good point. My intent is for pause() to wait for the > >>>>> next > >>>>>>>>>>> iteration > >>>>>>>>>>>>>>>>> through `runOnce()` and then only skip over the > >>>>> processing > >>>>>>>> for > >>>>>>>>>>> paused > >>>>>>>>>>>>>>>> tasks > >>>>>>>>>>>>>>>>> in `taskManager.process(numIterations, time)`. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Do commits live inside that call or do they live > >>>>>>>>> across/outside of > >>>>>>>>>>>>> it? > >>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>> the former case, I think there shouldn't be any issues > >>>>> with > >>>>>>>>> EOS. > >>>>>>>>>>>>>>>>> Otherwise, we may need to work through some details to > >>>>> get > >>>>>>>> EOS > >>>>>>>>>>> right. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Once we figure that out, I can update the KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jim > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes > >>>>>>>>>>>>>>>> <jhug...@confluent.io.invalid > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I have written up a KIP for adding the ability to > >>>>> pause > >>>>>>> and > >>>>>>>>>>> resume > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> processing of a topology in AK Streams. The KIP is > >>>>> here: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks in advance for your feedback! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Jim > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > > >