Thanks, Guozhang. I've updated the KIP and will start a vote. Colt McNealy
*Founder, LittleHorse.dev* On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <guozhang.wang...@gmail.com> wrote: > Thanks for the summary, that looks good to me. > > Guozhang > > On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> wrote: > > > > Hello there! > > > > Thanks everyone for the comments. There's a lot of back-and-forth going > on, > > so I'll do my best to summarize what everyone's said in TLDR format: > > > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do > similarly > > for the other methods. > > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > > 3. Remove the `earliestOffset` parameter for performance reasons. > > > > If that's all fine with everyone, I'll update the KIP and we—well, mostly > > Edu (: —will open a PR. > > > > Cheers, > > Colt McNealy > > > > *Founder, LittleHorse.dev* > > > > > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io> > > wrote: > > > > > Hello everyone, > > > > > > Thanks for all your feedback for this KIP! > > > > > > I think that the key to choosing proper names for this API is > understanding > > > the terms used inside the StoreChangelogReader. Currently, this class > has > > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my > opinion, > > > using StandbyUpdateListener for the interface fits better on these > terms. > > > Same applies for onUpdateStart/Suspended. > > > > > > StoreChangelogReader uses "the same mechanism" for active task > restoration > > > and standby task updates, but this is an implementation detail. Under > > > normal circumstances (no rebalances or task migrations), the changelog > > > reader will be in STANDBY_UPDATING, which means it will be updating > standby > > > tasks as long as there are new records in the changelog topic. That's > why I > > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't > 100% > > > align with StateRestoreListener, but either one is fine. > > > > > > Edu > > > > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > guozhang.wang...@gmail.com> > > > wrote: > > > > > > > Hello Colt, > > > > > > > > Thanks for writing the KIP! I have read through the updated KIP and > > > > overall it looks great. I only have minor naming comments (well, > > > > aren't naming the least boring stuff to discuss and that takes the > > > > most of the time for KIPs :P): > > > > > > > > 1. I tend to agree with Sophie regarding whether or not to include > > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since > > > > it is also more consistent with the functions of > > > > "StateRestoreListener" where we do not name it as > > > > "onStateRestoreState" etc. > > > > > > > > 2. I know in community discussions we sometimes say "a standby is > > > > promoted to active", but in the official code / java docs we did not > > > > have a term of "promotion", since what the code does is really > recycle > > > > the task (while keeping its state stores open), and create a new > > > > active task that takes in the recycled state stores and just changing > > > > the other fields like task type etc. After thinking about this for a > > > > bit, I tend to feel that "promoted" is indeed a better name for user > > > > facing purposes while "recycle" is more of a technical detail inside > > > > the code and could be abstracted away from users. So I feel keeping > > > > the name "PROMOTED" is fine. > > > > > > > > 3. Regarding "earliestOffset", it does feel like we cannot always > > > > avoid another call to the Kafka API. And on the other hand, I also > > > > tend to think that such bookkeeping may be better done at the app > > > > level than from the Streams' public API level. I.e. the app could > keep > > > > a "first ever starting offset" per "topic-partition-store" key, and a > > > > when we have rolling restart and hence some standby task keeps > > > > "jumping" from one client to another via task assignment, the app > > > > would update this value just one when it finds the > > > > ""topic-partition-store" was never triggered before. What do you > > > > think? > > > > > > > > 4. I do not have a strong opinion either, but what about > > > "onBatchUpdated" ? > > > > > > > > > > > > Guozhang > > > > > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io> > > > wrote: > > > > > > > > > > Sohpie— > > > > > > > > > > Thank you very much for such a detailed review of the KIP. It might > > > > > actually be longer than the original KIP in the first place! > > > > > > > > > > 1. Ack'ed and fixed. > > > > > > > > > > 2. Correct, this is a confusing passage and requires context: > > > > > > > > > > One thing on our list of TODO's regarding reliability is to > determine > > > how > > > > > to configure `session.timeout.ms`. In our Kubernetes Environment, > an > > > > > instance of our Streams App can be terminated, restarted, and get > back > > > > into > > > > > the "RUNNING" Streams state in about 20 seconds. We have two > options > > > > here: > > > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20 > > > seconds > > > > of > > > > > unavailability for affected partitions, but avoid shuffling Tasks; > or > > > b) > > > > > set session.timeout.ms to a low value, such as 6 seconds ( > > > > > heartbeat.interval.ms of 2000), and reduce the unavailability > window > > > > during > > > > > a rolling bounce but incur an "extra" rebalance. There are several > > > > > different costs to a rebalance, including the shuffling of standby > > > tasks. > > > > > JMX metrics are not fine-grained enough to give us an accurate > picture > > > of > > > > > what's going on with the whole Standby Task Shuffle Dance. I > > > hypothesize > > > > > that the Standby Update Listener might help us clarify just how the > > > > > shuffling actually (not theoretically) works, which will help us > make a > > > > > more informed decision about the session timeout config. > > > > > > > > > > If you think this is worth putting in the KIP, I'll polish it and > do > > > so; > > > > > else, I'll remove the current half-baked explanation. > > > > > > > > > > 3. Overall, I agree with this. In our app, each Task has only one > Store > > > > to > > > > > reduce the number of changelog partitions, so I sometimes forget > the > > > > > distinction between the two concepts, as reflected in the KIP (: > > > > > > > > > > 3a. I don't like the word "Restore" here, since Restoration refers > to > > > an > > > > > Active Task getting caught up in preparation to resume processing. > > > > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I > am a > > > > > native Python speaker so I do prefer shorter names anyways (: > > > > > > > > > > 3b1. +1 to removing the word 'Task'. > > > > > > > > > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer > > > > > `onStandbyUpdateStart()` which matches the name of the Interface > > > > > "StandbyUpdateListener". (the python part of me hates this, > however) > > > > > > > > > > 3b3. Going back to question 2), `earliestOffset` was intended to > allow > > > us > > > > > to more easily calculate the amount of state _already loaded_ in > the > > > > store > > > > > by subtracting (startingOffset - earliestOffset). This would help > us > > > see > > > > > how much inefficiency is introduced in a rolling restart—if we end > up > > > > going > > > > > from a situation with an up-to-date standby before the restart, and > > > then > > > > > after the whole restart, the Task is shuffled onto an instance > where > > > > there > > > > > is no previous state, then that is expensive. However, if the final > > > > > shuffling results in the Task back on an instance with a lot of > > > pre-built > > > > > state, it's not expensive. > > > > > > > > > > If a call over the network is required to determine the > earliestOffset, > > > > > then this is a "hard no-go" for me, and we will remove it (I'll > have to > > > > > check with Eduwer as he is close to having a working > implementation). I > > > > > think we can probably determine what we wanted to see in a > different > > > > > way, but it will take more thinking.. If `earliestOffset` is > confusing, > > > > > perhaps rename it to `earliestChangelogOffset`? > > > > > > > > > > `startingOffset` is easy to remove as it can be determined from the > > > first > > > > > call to `onBatch{Restored/Updated/Processed/Loaded}()`. > > > > > > > > > > Anyways, I've updated the JavaDoc in the interface; hopefully it's > more > > > > > clear. Awaiting further instructions here. > > > > > > > > > > 3c. Good point; after thinking, my preference is > `onBatchLoaded()` -> > > > > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. > I am > > > > > less fond of "processed" because when I was first learning Streams > I > > > > > mistakenly thought that standby tasks actually processed the input > > > topic > > > > > rather than loaded from the changelog. I'll defer to you here. > > > > > > > > > > 3d. +1 to `onUpdateSuspended()`, or better yet > > > > > `onStandbyUpdateSuspended()`. Will check about the implementation > of > > > > > keeping track of the number of records loaded. > > > > > > > > > > 4a. I think this might be best in a separate KIP, especially given > that > > > > > this is my and Eduwer's first time contributing to Kafka (so we > want to > > > > > minimize the blast radius). > > > > > > > > > > 4b. I might respectfully (and timidly) push back here, RECYCLED > for an > > > > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make > sense > > > > from > > > > > the standpoint of an Active Task, recycling to me sounds like > throwing > > > > > stuff away, such that the resources (i.e. disk space) can be used > by a > > > > > separate Task. As an alternative rather than trying to reuse the > same > > > > enum, > > > > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts > > > with > > > > > `ActiveSuspendReason`? However, I could be convinced to rename > PROMOTED > > > > -> > > > > > RECYCLED, especially if Eduwer agrees. > > > > > > > > > > TLDR: > > > > > > > > > > T1. Agreed, will remove the word "Task" as it's incorrect. > > > > > T2. Will update to `onStandbyUpdateStart()` > > > > > T3. Awaiting further instructions on earliestOffset and > startingOffset. > > > > > T4. I don't like `onBatchProcessed()` too much, perhaps > > > > `onBatchLoaded()`? > > > > > T5. Will update to `onStandbyUpdateSuspended()` > > > > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason, > rather > > > > than > > > > > renaming PROMOTED to RECYCLED? @Eduwer? > > > > > > > > > > Long Live the Otter, > > > > > Colt McNealy > > > > > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > > > > > > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman < > > > > sop...@responsive.dev> > > > > > wrote: > > > > > > > > > > > Hey Colt! Thanks for the KIP -- this will be a great addition to > > > > Streams, I > > > > > > can't believe we've gone so long without this. > > > > > > > > > > > > Overall the proposal makes sense, but I had a handful of fairly > minor > > > > > > questions and suggestions/requests > > > > > > > > > > > > 1. Seems like the last sentence in the 2nd paragraph of the > > > Motivation > > > > > > section is cut off and incomplete -- "want to be able to know " > what > > > > > > exactly? > > > > > > > > > > > > 2. This isn't that important since the motivation as a whole is > clear > > > > to me > > > > > > and convincing enough, but I'm not quite sure I understand the > > > example > > > > at > > > > > > the end of the Motivation section. How are standby tasks (and the > > > > ability > > > > > > to hook into and monitor their status) related to the > > > > session.timeout.ms > > > > > > config? > > > > > > > > > > > > 3. To help both old and new users of Kafka Streams understand > this > > > new > > > > > > restore listener and its purpose/semantics, can we try to name > the > > > > class > > > > > > and > > > > > > callbacks in a way that's more consistent with the active task > > > restore > > > > > > listener? > > > > > > > > > > > > 3a. StandbyTaskUpdateListener: > > > > > > The existing restore listener is called StateRestoreListener, so > the > > > > new > > > > > > one could be called something like StandbyStateRestoreListener. > > > > Although > > > > > > we typically refer to standby tasks as "processing" rather than > > > > "restoring" > > > > > > records -- ie restoration is a term for active task state > > > > specifically. I > > > > > > actually > > > > > > like the original suggestion if we just drop the "Task" part of > the > > > > name, > > > > > > ie StandbyUpdateListener. I think either that or > > > StandbyRestoreListener > > > > > > would be fine and probably the two best options. > > > > > > Also, this probably goes without saying but any change to the > name of > > > > this > > > > > > class should of course be reflected in the KafkaStreams#setXXX > API as > > > > well > > > > > > > > > > > > 3b. #onTaskCreated > > > > > > I know the "start" callback feels a bit different for the > standby > > > task > > > > > > updater vs an active task beginning restoration, but I think we > > > should > > > > try > > > > > > to > > > > > > keep the various callbacks aligned to their active restore > listener > > > > > > counterpart. We can/should just replace the term "restore" with > > > > "update" > > > > > > for the > > > > > > callback method names the same way we do for the class name, > which in > > > > this > > > > > > case would give us #onUpdateStart. Personally I like this better, > > > > > > but it's ultimately up to you. However, I would push back against > > > > anything > > > > > > that includes the word "Task" (eg #onTaskCreated) as the listener > > > > > > is actually not scoped to the task itself but instead to the > > > > individual > > > > > > state store(s). This is the main reason I would prefer calling it > > > > something > > > > > > like #onUpdateStart, which keeps the focus on the store being > updated > > > > > > rather than the task that just happens to own this store > > > > > > One last thing on this callback -- do we really need both the > > > > > > `earliestOffset` and `startingOffset`? I feel like this might be > more > > > > > > confusing than it > > > > > > is helpful (tbh even I'm not completely sure I know what the > > > > earliestOffset > > > > > > is supposed to represent) More importantly, is this all > information > > > > > > that is already available and able to be passed in to the > callback by > > > > > > Streams? I haven't checked on this but it feels like the > > > > earliestOffset is > > > > > > likely to require a remote call, either by the embedded consumer > or > > > > via the > > > > > > admin client. If so, the ROI on including this parameter seems > > > > > > quite low (if not outright negative) > > > > > > > > > > > > 3c. #onBatchRestored > > > > > > If we opt to use the term "update" in place of "restore" > elsewhere, > > > > then we > > > > > > should consider doing so here as well. What do you think about > > > > > > #onBatchUpdated, or even #onBatchProcessed? > > > > > > I'm actually not super concerned about this particular API, and > > > > honestly I > > > > > > think we can use restore or update interchangeably here, so if > you > > > > > > don't like any of the suggested names (and no one can think of > > > > anything > > > > > > better), I would just stick with #onBatchRestored. In this case, > > > > > > it kind of makes the most sense. > > > > > > > > > > > > 3d. #onTaskSuspended > > > > > > Along the same lines as 3b above, #onUpdateSuspended or just > > > > > > #onRestoreSuspended probably makes more sense for this callback. > > > Also, > > > > > > I notice the StateRestoreListener passes in the total number of > > > > records > > > > > > restored to its #onRestoreSuspended. Assuming we already track > > > > > > that information in Streams and have it readily available to > pass in > > > at > > > > > > whatever point we would be invoking this callback, that might be > a > > > > > > useful parameter for the standby listener to have as well > > > > > > > > > > > > 4. I totally love the SuspendReason thing, just two > notes/requests: > > > > > > > > > > > > 4a. Feel free to push back against adding onto the scope of this > KIP, > > > > but > > > > > > it would be great to expand the active state restore listener > with > > > this > > > > > > SuspendReason enum as well. It would be really useful for both > > > > variants of > > > > > > restore listener > > > > > > > > > > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for > > > standby > > > > > > tasks it means basically the same thing, the point is that active > > > > > > tasks can also be recycled into standbys through the same > mechanism. > > > > This > > > > > > way they can share the SuspendReason enum -- not that it's > > > > > > necessary for them to share, I just think it would be a good > idea to > > > > keep > > > > > > the two restore listeners aligned to the highest degree possible > for > > > as > > > > > > we can. > > > > > > I was actually considering proposing a short KIP with a new > > > > > > RecyclingListener (or something) specifically for this exact > kind of > > > > thing, > > > > > > since we > > > > > > currently have literally zero insight into the recycling process. > > > It's > > > > > > practically impossible to tell when a store has been converted > from > > > > active > > > > > > to > > > > > > standby, or vice versa. So having access to the SuspendReason, > and > > > more > > > > > > importantly having a callback guaranteed to notify you when a > > > > > > state store is recycled whether active or standby, would be > amazing. > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman > > > > > > > > > > > > > > > > > > ---------- Forwarded message --------- > > > > > > > From: Colt McNealy <c...@littlehorse.io> > > > > > > > Date: Tue, Oct 3, 2023 at 12:48 PM > > > > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener > > > > > > > To: <dev@kafka.apache.org> > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > We would like to propose a small KIP to improve the ability of > > > > Streams > > > > > > apps > > > > > > > to monitor the progress of their standby tasks through a > callback > > > > > > > interface. > > > > > > > > > > > > > > We have a nearly-working implementation on our fork and are > curious > > > > for > > > > > > > feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener > > > > > > > > > > > > > > Thank you, > > > > > > > Colt McNealy > > > > > > > > > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > > > > > > > > > > > > >