Hi Jim, Thanks for the KIP. Overall LGTM!
One late question: Could we run the stream resetter tool (i.e. kafka-streams-application-reset.sh) during pause state? I can imagine there's a use case that after pausing for a while, user just want to continue with the latest offset, and skipping the intermediate records. Thank you. Luke On Wed, May 11, 2022 at 10:12 AM Jim Hughes <jhug...@confluent.io.invalid> wrote: > Hi Matthias, > > I like it. I've updated the KIP to reflect that detail; I put the details > in the docs for pause. > > Cheers, > > Jim > > On Tue, May 10, 2022 at 7:51 PM Matthias J. Sax <mj...@apache.org> wrote: > > > Thanks for the KIP. Overall LGTM. > > > > Can we clarify one question: would it be allowed to call `pause()` > > before calling `start()`? I don't see any reason why we would need to > > disallow it? > > > > It could be helpful to start a KafkaStreams client in paused state -- > > otherwise there is a race between calling `start()` and calling > `pause()`. > > > > If we allow it, we should clearly document it. > > > > > > -Matthias > > > > On 5/10/22 12:04 PM, Jim Hughes wrote: > > > Hi Bill, all, > > > > > > Thank you. I've updated the KIP to reflect pausing standby tasks as > > well. > > > I think all the outstanding points have been addressed and I'm going to > > > start the vote thread! > > > > > > Cheers, > > > > > > Jim > > > > > > > > > > > > On Tue, May 10, 2022 at 2:43 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > >> Hi Jim, > > >> > > >> After reading the comments on the KIP, I agree that it makes sense to > > pause > > >> all activities and any changes can be made later on. > > >> > > >> Thanks, > > >> Bill > > >> > > >> On Tue, May 10, 2022 at 4:03 AM Bruno Cadonna <cado...@apache.org> > > wrote: > > >> > > >>> Hi Jim, > > >>> > > >>> Thanks for the KIP! > > >>> > > >>> I am fine with the KIP in general. > > >>> > > >>> However, I am with Sophie and John to also pause the standbys for the > > >>> reasons they brought up. Is there a specific reason you want to keep > > >>> standbys going? It feels like premature optimization to me. We can > > still > > >>> add keeping standby running in a follow up if needed. > > >>> > > >>> Best, > > >>> Bruno > > >>> > > >>> On 10.05.22 05:15, Sophie Blee-Goldman wrote: > > >>>> Thanks Jim, just one note/question on the standby tasks: > > >>>> > > >>>> At the minute, my moderately held position is that standby tasks > ought > > >> to > > >>>>> continue reading and remain caught up. If standby tasks would run > > out > > >>> of > > >>>>> space, there are probably bigger problems. > > >>>> > > >>>> > > >>>> For a single node application, or when the #pause API is invoked on > > all > > >>>> instances, > > >>>> then there won't be any further active processing and thus nothing > to > > >>> keep > > >>>> up with, > > >>>> right? So for that case, it's just a matter of whether any standbys > > >> that > > >>>> are lagging > > >>>> will have the chance to catch up to the (paused) active task state > > >> before > > >>>> they stop > > >>>> as well, in which case having them continue feels fine to me. > However > > >>> this > > >>>> is a > > >>>> relatively trivial benefit and I would only consider it as a > deciding > > >>>> factor when all > > >>>> things are equal otherwise. > > >>>> > > >>>> My concern is the more interesting case: when this feature is used > to > > >>> pause > > >>>> only > > >>>> one nodes, or some subset of the overall application. In this case, > > >> yes, > > >>>> the standby > > >>>> tasks will indeed fall out of sync. But the only reason I can > imagine > > >>>> someone using > > >>>> the pause feature in such a way is because there is something going > > >>> wrong, > > >>>> or about > > >>>> to go wrong, on that particular node. For example as mentioned > above, > > >> if > > >>>> the user > > >>>> wants to cut down on costs without stopping everything, or if the > node > > >> is > > >>>> about to > > >>>> run out of disk or needs to be debugged or so on. And in this case, > > >>>> continuing to > > >>>> process the standby tasks while other instances continue to run > would > > >>>> pretty much > > >>>> defeat the purpose of pausing it entirely, and might have unpleasant > > >>>> consequences > > >>>> for the unsuspecting developer. > > >>>> > > >>>> All that said, I don't want to block this KIP so if you have strong > > >>>> feelings about the > > >>>> standby behavior I'm happy to back down. I'm only pushing back now > > >>> because > > >>>> it > > >>>> felt like there wasn't any particular motivation for the standbys to > > >>>> continue processing > > >>>> or not, and I figured I'd try to fill in this gap with my thoughts > on > > >> the > > >>>> matter :) > > >>>> Either way we should just make sure that this behavior is documented > > >>>> clearly, > > >>>> since it may be surprising if we decide to only pause active > > processing > > >>>> (another option > > >>>> is to rename the method something like #pauseProcessing or > > >>>> #pauseActiveProcessing > > >>>> so that it's hard to miss). > > >>>> > > >>>> Thanks! Sorry for the lengthy response, but hopefully we won't need > to > > >>>> debate this any > > >>>> further. Beyond this I'm satisfied with the latest proposal > > >>>> > > >>>> On Mon, May 9, 2022 at 5:16 PM John Roesler <vvcep...@apache.org> > > >> wrote: > > >>>> > > >>>>> Thanks for the updates, Jim! > > >>>>> > > >>>>> After this discussion and your updates, this KIP looks good to me. > > >>>>> > > >>>>> Thanks, > > >>>>> John > > >>>>> > > >>>>> On Mon, May 9, 2022, at 17:52, Jim Hughes wrote: > > >>>>>> Hi Sophie, all, > > >>>>>> > > >>>>>> I've updated the KIP with feedback from the discussion so far: > > >>>>>> > > >>>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 > > >>>>>> > > >>>>>> As a terse summary of my current position: > > >>>>>> Pausing will only stop processing and punctuation (respecting > > modular > > >>>>>> topologies). > > >>>>>> Paused topologies will still a) consume from input topics, b) call > > >> the > > >>>>>> usual commit pathways (commits will happen basically as they would > > >>> have), > > >>>>>> and c) standBy tasks will still be processed. > > >>>>>> > > >>>>>> Shout if the KIP or those details still need some TLC. Responding > > to > > >>>>>> Sophie inline below. > > >>>>>> > > >>>>>> > > >>>>>> On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman > > >>>>>> <sop...@confluent.io.invalid> wrote: > > >>>>>> > > >>>>>>> Don't worry, I'm going to be adding the APIs for topology-level > > >>> pausing > > >>>>> as > > >>>>>>> part of the modular topologies KIP, > > >>>>>>> so we don't need to worry about that for now. That said, I don't > > >> think > > >>>>> we > > >>>>>>> should brush it off entirely and design > > >>>>>>> this feature in a way that's going to be incompatible or hugely > > >> raise > > >>>>> the > > >>>>>>> LOE on bringing the (mostly already > > >>>>>>> implemented) modular topologies feature into the public API, just > > >>>>>>> because it "won the race to write a KIP" :) > > >>>>>>> > > >>>>>> > > >>>>>> Yes, I'm hoping that this is all compatible with modular > > >> topologies. I > > >>>>>> haven't seen anything so far which seems to be a problem; this KIP > > is > > >>>>> just > > >>>>>> in a weird state to discuss details of acting on modular > > >> topologies.:) > > >>>>>> > > >>>>>> > > >>>>>>> I may be biased (ok, I definitely am), but I'm not in favor of > > >> adding > > >>>>> this > > >>>>>>> as a state regardless of the modular topologies. > > >>>>>>> First of all any change to the KafkaStreams state machine is a > > >>> breaking > > >>>>>>> change, no? So we would have to wait until > > >>>>>>> the next major release which seems like an unnecessary thing to > > >> block > > >>>>> on. > > >>>>>>> (Whether to add this as a state to the > > >>>>>>> StreamThread's FSM is an implementation detail). > > >>>>>>> > > >>>>>> > > >>>>>> +1. I am sold on skipping out on new states. I had that as a > > >> rejected > > >>>>>> alternative in the KIP and have added a few more words to that > bit. > > >>>>>> > > >>>>>> > > >>>>>>> Also, the semantics of using an `isPaused` method to distinguish > a > > >>>>> paused > > >>>>>>> instance (or topology) make more sense > > >>>>>>> to me -- this is a user-specified status, whereas the > KafkaStreams > > >>>>> state is > > >>>>>>> intended to relay the status of the system > > >>>>>>> itself. For example, if we are going to continue to poll during > > >> pause, > > >>>>> then > > >>>>>>> shouldn't the client transition to REBALANCING? > > >>>>>>> I believe it makes sense to still allow distinguishing these > states > > >>>>> while a > > >>>>>>> client is paused, whereas making PAUSED its > > >>>>>>> own state means you can't tell when the client is rebalancing vs > > >>>>> running, > > >>>>>>> or whether it is paused or dead: presumably > > >>>>>>> the NOT_RUNNING/ERROR state would trump the PAUSED state, which > > >> means > > >>>>> you > > >>>>>>> would not be able to rely on > > >>>>>>> checking the state to see if you had called PAUSED on that > > instance. > > >>>>>>> Obviously you can work around this by just > > >>>>>>> maintaining a flag in the usercode, but all this feels very > > >> unnatural > > >>>>> to me > > >>>>>>> vs just checking the `#isPaused` API. > > >>>>>>> > > >>>>>>> On that note, I had one question -- at what point would the > > >>> `#isPaused` > > >>>>>>> check return true? Would it do so immediately > > >>>>>>> after pausing the instance, or only once it has finished > committing > > >>>>> offsets > > >>>>>>> and stopped returning records? > > >>>>>>> > > >>>>>> > > >>>>>> Immediately, `#isPaused` tells you about metadata. > > >>>>>> > > >>>>>> > > >>>>>>> Finally, on the note of punctuators I think it would make most > > sense > > >>> to > > >>>>>>> either pause these as well or else add this an > > >>>>>>> an explicit option for the user. If this feature is used to, for > > >>>>> example, > > >>>>>>> help save on processing costs while an app is > > >>>>>>> not in use, then it would probably be surprising and perhaps > > >> alarming > > >>> to > > >>>>>>> see certain kinds of processing still continue. > > >>>>>>> > > >>>>>> > > >>>>>> From other parts of the discussion, I'm sold on pausing > > punctuation. > > >>>>>> > > >>>>>> > > >>>>>>> The question of whether to continue fetching for standby tasks is > > >>> maybe > > >>>>> a > > >>>>>>> bit more debatable, as it would certainly be > > >>>>>>> nice to find your clients all caught up when you go to resume the > > >>>>> instance > > >>>>>>> again, but I would still strongly suggest > > >>>>>>> pausing these as well. To use a similar example, imagine if you > > >> paused > > >>>>> an > > >>>>>>> app because it was about to run out of > > >>>>>>> disk. If the standbys kept processing and filled up the remaining > > >>> space, > > >>>>>>> you'd probably feel a bit betrayed by this API. > > >>>>>>> > > >>>>>>> WDYT? > > >>>>>>> > > >>>>>> > > >>>>>> At the minute, my moderately held position is that standby tasks > > >> ought > > >>> to > > >>>>>> continue reading and remain caught up. If standby tasks would run > > >> out > > >>> of > > >>>>>> space, there are probably bigger problems. > > >>>>>> > > >>>>>> If later it is desirable to manage punctuation or standby tasks, > > then > > >>> it > > >>>>>> should be easy for future folks to modify things. > > >>>>>> > > >>>>>> Overall, I'd frame this KIP as "pause processing resulting in > > >> outputs". > > >>>>>> > > >>>>>> Cheers, > > >>>>>> > > >>>>>> Jim > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>>> On Mon, May 9, 2022 at 10:33 AM Guozhang Wang < > wangg...@gmail.com> > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> I think for named topology we can leave the scope of this KIP as > > >> "all > > >>>>> or > > >>>>>>>> nothing", i.e. when you pause an instance you pause all of its > > >>>>>>> topologies. > > >>>>>>>> I raised this question in my previous email just trying to > clarify > > >> if > > >>>>>>> this > > >>>>>>>> is what you have in mind. We can leave the question of finer > > >>>>> controlled > > >>>>>>>> pausing behavior for later when we have named topology being > > >> exposed > > >>>>> via > > >>>>>>>> another KIP. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> On Mon, May 9, 2022 at 7:50 AM John Roesler < > vvcep...@apache.org> > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Jim, > > >>>>>>>>> > > >>>>>>>>> Thanks for the replies. This all sounds good to me. Just two > > >> further > > >>>>>>>>> comments: > > >>>>>>>>> > > >>>>>>>>> 3. It seems like you should aim for the simplest semantics. If > > the > > >>>>>>> intent > > >>>>>>>>> is to “pause” the instance, then you’d better pause the whole > > >>>>> instance. > > >>>>>>>> If > > >>>>>>>>> you leave punctuations and standbys running, I expect we’d see > > bug > > >>>>>>>> reports > > >>>>>>>>> come in that the instance isn’t really paused. > > >>>>>>>>> > > >>>>>>>>> 5. Since you won the race to write a KIP, I don’t think it > makes > > >> too > > >>>>>>> much > > >>>>>>>>> sense to worry too much about modular topologies. When they > > >> propose > > >>>>>>> their > > >>>>>>>>> KIP, they will have to specify a lot of state management > > behavior, > > >>>>> and > > >>>>>>>>> pause/resume will have to be part of it. If they have some > > concern > > >>>>>>> about > > >>>>>>>>> your KIP, they’ll chime in. It doesn’t make sense for you to > try > > >> and > > >>>>>>>> guess > > >>>>>>>>> what that proposal will look like. > > >>>>>>>>> > > >>>>>>>>> To be honest, you’re proposing a KafkaStreams runtime-level > > >>>>>>> pause/resume > > >>>>>>>>> function, not a topology-level one anyway, so it seems pretty > > >> clear > > >>>>>>> that > > >>>>>>>> it > > >>>>>>>>> would pause the whole runtime (of a single instance) regardless > > of > > >>>>> any > > >>>>>>>>> modular topologies. If the intent is to pause individual > > >> topologies > > >>>>> in > > >>>>>>>> the > > >>>>>>>>> future, you’d need a different API anyway. > > >>>>>>>>> > > >>>>>>>>> Thanks! > > >>>>>>>>> -John > > >>>>>>>>> > > >>>>>>>>> On Mon, May 9, 2022, at 08:10, Jim Hughes wrote: > > >>>>>>>>>> Hi John, > > >>>>>>>>>> > > >>>>>>>>>> Long emails are great; responding inline! > > >>>>>>>>>> > > >>>>>>>>>> On Sat, May 7, 2022 at 4:54 PM John Roesler < > > vvcep...@apache.org > > >>> > > >>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Thanks for the KIP, Jim! > > >>>>>>>>>>> > > >>>>>>>>>>> This conversation seems to highlight that the KIP needs to > > >>>>> specify > > >>>>>>>>>>> some of its behavior as well as its APIs, where the behavior > is > > >>>>>>>>>>> observable and significant to users. > > >>>>>>>>>>> > > >>>>>>>>>>> For example: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. Do you plan to have a guarantee that immediately after > > >>>>>>>>>>> calling KafkaStreams.pause(), users should observe that the > > >>>>> instance > > >>>>>>>>>>> stops processing new records? Or should they expect that the > > >>>>> threads > > >>>>>>>>>>> will continue to process some records and pause > asynchronously > > >>>>>>>>>>> (you already answered this in the thread earlier)? > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I'm happy to build up to a guarantee of sorts. My current > idea > > >> is > > >>>>>>> that > > >>>>>>>>>> pause() does not do anything "exceptional" to get control back > > >>>>> from a > > >>>>>>>>>> running topology. A currently running topology would get to > > >>>>> complete > > >>>>>>>> its > > >>>>>>>>>> loop. > > >>>>>>>>>> > > >>>>>>>>>> Separately, I'm still piecing together how commits work. By > > some > > >>>>>>>>>> mechanism, after a pause, I do agree that the topology needs > to > > >>>>>>> commit > > >>>>>>>>> its > > >>>>>>>>>> work in some manner. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> 2. Will the threads continue to poll new records until they > > >>>>>>> naturally > > >>>>>>>>> fill > > >>>>>>>>>>> up the task buffers, or will they immediately pause their > > >>>>> Consumers > > >>>>>>>>>>> as well? > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Presently, I'm suggesting that consumers would fill up their > > >>>>> buffers. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> 3. Will threads continue to call (system time) punctuators, > or > > >>>>> would > > >>>>>>>>>>> punctuations also be paused? > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> In my first pass at thinking through this, I left the > > punctuators > > >>>>>>>>> running. > > >>>>>>>>>> To be honest, I'm not sure what they do, so my approach is > > either > > >>>>>>> lucky > > >>>>>>>>> and > > >>>>>>>>>> correct or it could be Very Clearly Wrong.;) > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> I realize that some of those questions simply may not have > > >>>>> occurred > > >>>>>>> to > > >>>>>>>>>>> you, so this is not a criticism for leaving them off; I'm > just > > >>>>>>>> pointing > > >>>>>>>>> out > > >>>>>>>>>>> that although we don't tend to mention implementation details > > in > > >>>>>>> KIPs, > > >>>>>>>>>>> we also can't be too high level, since there are a lot of > > >>>>>>> operational > > >>>>>>>>>>> details that users rely on to achieve various behaviors in > > >>>>> Streams. > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Ayup, I will add some details as we iron out the guarantees, > > >>>>>>>>> implementation > > >>>>>>>>>> details that are at the API level. This one is tough since > > >>>>> internal > > >>>>>>>>>> features like NamedTopologies are part of the discussion. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> A couple more comments: > > >>>>>>>>>>> > > >>>>>>>>>>> 4. +1 to what Guozhang said. It seems like we should we also > do > > >> a > > >>>>>>>> commit > > >>>>>>>>>>> before entering the paused state. That way, any open > > >> transactions > > >>>>>>>> would > > >>>>>>>>>>> be closed and not have to worry about timing out. Even under > > >>>>> ALOS, > > >>>>>>> it > > >>>>>>>>>>> seems best to go ahead and complete the processing of > in-flight > > >>>>>>>> records > > >>>>>>>>>>> by committing. That way, if anything happens to die while > it's > > >>>>>>> paused, > > >>>>>>>>>>> existing > > >>>>>>>>>>> work won't have to be repeated. Plus, if there are any > > >> processors > > >>>>>>> with > > >>>>>>>>> side > > >>>>>>>>>>> effects, users won't have to tolerate weird edge cases where > a > > >>>>> pause > > >>>>>>>>> occurs > > >>>>>>>>>>> after a processor sees a record, but before the result is > sent > > >> to > > >>>>>>> its > > >>>>>>>>>>> outputs. > > >>>>>>>>>>> > > >>>>>>>>>>> 5. I noticed that you proposed not to add a PAUSED state, > but I > > >>>>>>> didn't > > >>>>>>>>>>> follow > > >>>>>>>>>>> the rationale. Adding a state seems beneficial for a number > of > > >>>>>>>> reasons: > > >>>>>>>>>>> StreamThreads already use the thread state to determine > whether > > >>>>> to > > >>>>>>>>> process > > >>>>>>>>>>> or not, so avoiding a new State would just mean adding a > > >> separate > > >>>>>>> flag > > >>>>>>>>> to > > >>>>>>>>>>> track > > >>>>>>>>>>> and then checking your new flag in addition to the State in > the > > >>>>>>>> thread. > > >>>>>>>>>>> Also, > > >>>>>>>>>>> operating Streams applications is a non-trivial task, and > users > > >>>>> rely > > >>>>>>>> on > > >>>>>>>>>>> the State > > >>>>>>>>>>> (and transitions) to understand Streams's behavior. Adding a > > >>>>> PAUSED > > >>>>>>>>> state > > >>>>>>>>>>> is an elegant way to communicate to operators what is > happening > > >>>>> with > > >>>>>>>> the > > >>>>>>>>>>> application. Note that the person digging though logs and > > >>>>> metrics, > > >>>>>>>>> trying > > >>>>>>>>>>> to understand why the application isn't doing anything is > > >>>>> probably > > >>>>>>> not > > >>>>>>>>>>> going > > >>>>>>>>>>> to be the same person who is calling pause() and resume(). > > Also, > > >>>>> if > > >>>>>>>> you > > >>>>>>>>> add > > >>>>>>>>>>> a state, you don't need `isPaused()`. > > >>>>>>>>>>> > > >>>>>>>>>>> 5b. If you buy the arguments to go ahead and commit as well > as > > >>>>> the > > >>>>>>>>>>> argument to add a State, then I'd also suggest to follow the > > >>>>>>> existing > > >>>>>>>>>>> patterns > > >>>>>>>>>>> for the shutdown states by also adding PAUSING. That > > >>>>>>>>>>> way, you'll also expose a way to understand that Streams > > >> received > > >>>>>>> the > > >>>>>>>>>>> signal > > >>>>>>>>>>> to pause, and that it's still processing and committing some > > >>>>> records > > >>>>>>>> in > > >>>>>>>>>>> preparation to enter a PAUSED state. I'm not sure if a > RESUMING > > >>>>>>> state > > >>>>>>>>> would > > >>>>>>>>>>> also make sense. > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I hit a tricky bit when thinking through having a PAUSED > > >>>>> state... If > > >>>>>>>> one > > >>>>>>>>>> is using Named Topologies, and some of them are paused, what > > >>>>> state is > > >>>>>>>> the > > >>>>>>>>>> Streams instance in? If we can agree on that, things may > become > > >>>>>>>>> clear.... > > >>>>>>>>>> I can see two quick ideas: > > >>>>>>>>>> > > >>>>>>>>>> 1. The state is RUNNING and NamedTopologies have some other > way > > >>>>> to > > >>>>>>>>>> indicate state. > > >>>>>>>>>> > > >>>>>>>>>> 2. The state is something messy like PARTIALLY_PAUSED to > > reflect > > >>>>>>> that > > >>>>>>>>> the > > >>>>>>>>>> instance has something interesting going on. > > >>>>>>>>>> > > >>>>>>>>>> When I poked at things initially, I did try out having > different > > >>>>>>>> states, > > >>>>>>>>>> and I readily agree that a PAUSING state may make sense. > > >>>>> (Especially > > >>>>>>>> if > > >>>>>>>>>> there's a need to run commits before transitioning all the way > > to > > >>>>>>>>> PAUSED.) > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> And that's all I have to say about that. I hope you don't > find > > >> my > > >>>>>>>>>>> long message offputting. I'm fundamentally in favor of your > > KIP, > > >>>>>>>>>>> and I think with a little more explanation in the KIP, and a > > few > > >>>>>>>>>>> small tweaks to the proposal, we'll be able to provide good > > >>>>>>>>>>> ergonomics to our users. > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Thanks! > > >>>>>>>>>> > > >>>>>>>>>> Jim > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> -John > > >>>>>>>>>>> > > >>>>>>>>>>> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote: > > >>>>>>>>>>>> I'm in favor of the "just pausing the instance itself“ > option > > >>>>> as > > >>>>>>>>> well. As > > >>>>>>>>>>>> for EOS, the point is that when the processing is paused, we > > >>>>> would > > >>>>>>>> not > > >>>>>>>>>>>> trigger any `producer.send` during the time, and the > > >>>>> transaction > > >>>>>>>>> timeout > > >>>>>>>>>>> is > > >>>>>>>>>>>> sort of relying on that behavior, so my point was that it's > > >>>>>>> probably > > >>>>>>>>>>> better > > >>>>>>>>>>>> to also commit the processing before we pause it. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> Guozhang > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Fri, May 6, 2022 at 6:12 PM Jim Hughes > > >>>>>>>>> <jhug...@confluent.io.invalid> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Matthias, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Since the only thing which will be paused is processing the > > >>>>>>>>> topology, I > > >>>>>>>>>>>>> think we can let commits happen naturally. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Good point about getting the paused state to new members; > it > > >>>>> is > > >>>>>>>>> seeming > > >>>>>>>>>>>>> like the "building block" approach is a good one to keep > > >>>>> things > > >>>>>>>>> simple > > >>>>>>>>>>> at > > >>>>>>>>>>>>> first. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Jim > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax < > > >>>>> mj...@apache.org > > >>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> I think it's tricky to propagate a pauseAll() via the > > >>>>> rebalance > > >>>>>>>>>>>>>> protocol. New members joining the group would need to get > > >>>>>>> paused, > > >>>>>>>>> too? > > >>>>>>>>>>>>>> Could there be weird race conditions with overlapping > > >>>>>>> pauseAll() > > >>>>>>>>> and > > >>>>>>>>>>>>>> resumeAll() calls on different instanced while there could > > >>>>> be a > > >>>>>>>>>>> errors / > > >>>>>>>>>>>>>> network partitions or similar? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I would argue that similar to IQ, we provide the basic > > >>>>> building > > >>>>>>>>>>> blocks, > > >>>>>>>>>>>>>> and leave it the user users to implement cross instance > > >>>>>>>> management > > >>>>>>>>>>> for a > > >>>>>>>>>>>>>> pauseAll() scenario. -- Also, if there is really demand, > we > > >>>>> can > > >>>>>>>>> always > > >>>>>>>>>>>>>> add pauseAll()/resumeAll() as follow up work. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> About named typologies: I agree to Jim to not include them > > >>>>> in > > >>>>>>>> this > > >>>>>>>>> KIP > > >>>>>>>>>>>>>> as they are not a public feature yet. If we make named > > >>>>>>> typologies > > >>>>>>>>>>>>>> public, the corresponding KIP should extend the > pause/resume > > >>>>>>>>> feature > > >>>>>>>>>>>>>> (ie, APIs) accordingly. Of course, the code can (and > should) > > >>>>>>>>> already > > >>>>>>>>>>> be > > >>>>>>>>>>>>>> setup to support it to be future proof. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Good call out about commit and EOS -- to simplify it, I > > >>>>> think > > >>>>>>> it > > >>>>>>>>> might > > >>>>>>>>>>>>>> be good to commit also for the at-least-once case? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On 5/6/22 1:05 PM, Jim Hughes wrote: > > >>>>>>>>>>>>>>> Hi Bill, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Great questions; I'll do my best to reply inline: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Fri, May 6, 2022 at 3:21 PM Bill Bejeck < > > >>>>>>> bbej...@gmail.com> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Jim, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the KIP. I have a couple of meta-questions > as > > >>>>>>>> well: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> 1) Regarding pausing only a subset of running instances, > > >>>>> I'm > > >>>>>>>>>>> thinking > > >>>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>> may be a use case for pausing all of them. > > >>>>>>>>>>>>>>>> Would it make sense to also allow for pausing all > > >>>>>>>> instances > > >>>>>>>>> by > > >>>>>>>>>>>>>> adding a > > >>>>>>>>>>>>>>>> method `pauseAll()` or something similar? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Honestly, I'm indifferent on this point. Presently, I > > >>>>> think > > >>>>>>>>> what I > > >>>>>>>>>>>>> have > > >>>>>>>>>>>>>>> proposed is the minimal change to get the ability to > pause > > >>>>>>> and > > >>>>>>>>>>> resume > > >>>>>>>>>>>>>>> processing. If adding a 'pauseAll()' is required, I'd be > > >>>>>>> happy > > >>>>>>>>> to > > >>>>>>>>>>> do > > >>>>>>>>>>>>>> that! > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> From Guozhang's email, it sounds like this would > require > > >>>>>>> using > > >>>>>>>>> the > > >>>>>>>>>>>>>>> rebalance protocol to trigger the coordination. Would > > >>>>> there > > >>>>>>> be > > >>>>>>>>>>> enough > > >>>>>>>>>>>>>> room > > >>>>>>>>>>>>>>> in that approach to indicate that a named topology is to > > >>>>> be > > >>>>>>>>> paused > > >>>>>>>>>>>>> across > > >>>>>>>>>>>>>>> all nodes? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> 2) Would pausing affect standby tasks? For example, > > >>>>> imagine > > >>>>>>>>> there > > >>>>>>>>>>>>> are 3 > > >>>>>>>>>>>>>>>> instances A, B, and C. > > >>>>>>>>>>>>>>>> A user elects to pause instance C only but it > hosts > > >>>>> the > > >>>>>>>>> standby > > >>>>>>>>>>>>>> tasks > > >>>>>>>>>>>>>>>> for A. > > >>>>>>>>>>>>>>>> Would the standby tasks on the paused application > > >>>>>>> continue > > >>>>>>>>> to > > >>>>>>>>>>> read > > >>>>>>>>>>>>>> from > > >>>>>>>>>>>>>>>> the changelog topic? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Yes, standby tasks would continue reading from the > > >>>>> changelog > > >>>>>>>>> topic. > > >>>>>>>>>>>>> All > > >>>>>>>>>>>>>>> consumers would continue reading to avoid getting dropped > > >>>>>>> from > > >>>>>>>>> their > > >>>>>>>>>>>>>>> consumer groups. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Jim > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks! > > >>>>>>>>>>>>>>>> Bill > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 2:44 PM Jim Hughes > > >>>>>>>>>>>>> <jhug...@confluent.io.invalid > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Guozhang, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for the feedback; responses inline below: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang < > > >>>>>>>>> wangg...@gmail.com> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hello Jim, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks for the proposed KIP. I have some meta > questions > > >>>>>>>> about > > >>>>>>>>> it: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 1) Would an instance always pause/resume all of its > > >>>>>>> current > > >>>>>>>>> owned > > >>>>>>>>>>>>>>>>>> topologies (i.e. the named topologies), or are there > > >>>>> any > > >>>>>>>>>>> scenarios > > >>>>>>>>>>>>>>>> where > > >>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>> only want to pause/resume a subset of them? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> An instance may wish to pause some of its named > > >>>>> topologies. > > >>>>>>>> I > > >>>>>>>>> was > > >>>>>>>>>>>>>> unsure > > >>>>>>>>>>>>>>>>> what to say about named topologies in the KIP since > they > > >>>>>>> seem > > >>>>>>>>> to > > >>>>>>>>>>> be > > >>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>> internal detail at the moment. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I intend to add to KafkaStreamsNamedTopologyWrapper > > >>>>> methods > > >>>>>>>>> like: > > >>>>>>>>>>>>>>>>> public void pauseNamedTopology(final String > > >>>>>>>>> topologyToPause) > > >>>>>>>>>>>>>>>>> public boolean isNamedTopologyPaused(final > String > > >>>>>>>>> topology) > > >>>>>>>>>>>>>>>>> public void resumeNamedTopology(final String > > >>>>>>>>>>> topologyToResume) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 2) From a user's perspective, do we want to always > > >>>>> issue a > > >>>>>>>>>>>>>>>> `pause/resume` > > >>>>>>>>>>>>>>>>>> to all the instances or not? For example, we can > define > > >>>>>>> the > > >>>>>>>>>>>>> semantics > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> the function as "you only need to call this function > on > > >>>>>>> any > > >>>>>>>> of > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> application's instances, and all instances would then > > >>>>>>> pause > > >>>>>>>>> (via > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> rebalance error codes)", or as "you would call this > > >>>>>>> function > > >>>>>>>>> for > > >>>>>>>>>>> all > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> instances of an application". Which one are you > > >>>>> referring > > >>>>>>>> to? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> My initial intent is that one would call this function > > >>>>> on > > >>>>>>> any > > >>>>>>>>>>>>> instances > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> the application that one wishes to pause. This should > > >>>>>>> allow > > >>>>>>>>> more > > >>>>>>>>>>>>>> control > > >>>>>>>>>>>>>>>>> (in case one wanted to pause a portion of the > > >>>>> instances). > > >>>>>>> On > > >>>>>>>>> the > > >>>>>>>>>>>>> other > > >>>>>>>>>>>>>>>>> hand, this approach would put more work on the > > >>>>> implementer > > >>>>>>> to > > >>>>>>>>>>>>>> coordinate > > >>>>>>>>>>>>>>>>> calling pause or resume across instances. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> If the other option is more suitable, happy to do that > > >>>>>>>> instead. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 3) With EOS, there's a transaction timeout which would > > >>>>>>>>> determine > > >>>>>>>>>>> how > > >>>>>>>>>>>>>>>>> long a > > >>>>>>>>>>>>>>>>>> transaction can stay idle before it's force-aborted on > > >>>>> the > > >>>>>>>>> broker > > >>>>>>>>>>>>>>>> side. I > > >>>>>>>>>>>>>>>>>> think when a pause is issued, that means we'd need to > > >>>>>>>>> immediately > > >>>>>>>>>>>>>>>> commit > > >>>>>>>>>>>>>>>>>> the current transaction for EOS since we do not know > > >>>>> how > > >>>>>>>> long > > >>>>>>>>> we > > >>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>>> pause for. Is that right? If yes could you please > > >>>>> clarify > > >>>>>>>>> that in > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> doc > > >>>>>>>>>>>>>>>>>> as well. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Good point. My intent is for pause() to wait for the > > >>>>> next > > >>>>>>>>>>> iteration > > >>>>>>>>>>>>>>>>> through `runOnce()` and then only skip over the > > >>>>> processing > > >>>>>>>> for > > >>>>>>>>>>> paused > > >>>>>>>>>>>>>>>> tasks > > >>>>>>>>>>>>>>>>> in `taskManager.process(numIterations, time)`. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Do commits live inside that call or do they live > > >>>>>>>>> across/outside of > > >>>>>>>>>>>>> it? > > >>>>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>>> the former case, I think there shouldn't be any issues > > >>>>> with > > >>>>>>>>> EOS. > > >>>>>>>>>>>>>>>>> Otherwise, we may need to work through some details to > > >>>>> get > > >>>>>>>> EOS > > >>>>>>>>>>> right. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Once we figure that out, I can update the KIP. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Jim > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes > > >>>>>>>>>>>>>>>> <jhug...@confluent.io.invalid > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I have written up a KIP for adding the ability to > > >>>>> pause > > >>>>>>> and > > >>>>>>>>>>> resume > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>> processing of a topology in AK Streams. The KIP is > > >>>>> here: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks in advance for your feedback! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Jim > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -- > > >>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>>> -- Guozhang > > >>>>>>>> > > >>>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > >