Hi Bruno, Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to have zero impact on performance. However, we can leverage the existing #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update is done periodically using the "commit.interval.ms" configuration. I believe this option will force us to invoke StandbyUpdateLister once this interval is reached.
On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org> wrote: > Thanks for the KIP, Colt and Eduwer, > > Are you sure there is also not a significant performance impact for > passing into the callback `currentEndOffset`? > > I am asking because the comment here: > > https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129 > > says that the end-offset is only updated once for standby tasks whose > changelog topic is not piggy-backed on input topics. I could also not > find the update of end-offset for those standbys. > > > Best, > Bruno > > On 10/16/23 10:55 AM, Lucas Brutschy wrote: > > Hi all, > > > > it's a nice improvement! I don't have anything to add on top of the > > previous comments, just came here to say that it seems to me consensus > > has been reached and the result looks good to me. > > > > Thanks Colt and Eduwer! > > Lucas > > > > On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <c...@littlehorse.io> > wrote: > >> > >> Thanks, Guozhang. I've updated the KIP and will start a vote. > >> > >> Colt McNealy > >> > >> *Founder, LittleHorse.dev* > >> > >> > >> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang < > guozhang.wang...@gmail.com> > >> wrote: > >> > >>> Thanks for the summary, that looks good to me. > >>> > >>> Guozhang > >>> > >>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> > wrote: > >>>> > >>>> Hello there! > >>>> > >>>> Thanks everyone for the comments. There's a lot of back-and-forth > going > >>> on, > >>>> so I'll do my best to summarize what everyone's said in TLDR format: > >>>> > >>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`, and do > >>> similarly > >>>> for the other methods. > >>>> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`. > >>>> 3. Remove the `earliestOffset` parameter for performance reasons. > >>>> > >>>> If that's all fine with everyone, I'll update the KIP and we—well, > mostly > >>>> Edu (: —will open a PR. > >>>> > >>>> Cheers, > >>>> Colt McNealy > >>>> > >>>> *Founder, LittleHorse.dev* > >>>> > >>>> > >>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro < > edu...@littlehorse.io> > >>>> wrote: > >>>> > >>>>> Hello everyone, > >>>>> > >>>>> Thanks for all your feedback for this KIP! > >>>>> > >>>>> I think that the key to choosing proper names for this API is > >>> understanding > >>>>> the terms used inside the StoreChangelogReader. Currently, this class > >>> has > >>>>> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my > >>> opinion, > >>>>> using StandbyUpdateListener for the interface fits better on these > >>> terms. > >>>>> Same applies for onUpdateStart/Suspended. > >>>>> > >>>>> StoreChangelogReader uses "the same mechanism" for active task > >>> restoration > >>>>> and standby task updates, but this is an implementation detail. Under > >>>>> normal circumstances (no rebalances or task migrations), the > changelog > >>>>> reader will be in STANDBY_UPDATING, which means it will be updating > >>> standby > >>>>> tasks as long as there are new records in the changelog topic. That's > >>> why I > >>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't > >>> 100% > >>>>> align with StateRestoreListener, but either one is fine. > >>>>> > >>>>> Edu > >>>>> > >>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang < > >>> guozhang.wang...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hello Colt, > >>>>>> > >>>>>> Thanks for writing the KIP! I have read through the updated KIP and > >>>>>> overall it looks great. I only have minor naming comments (well, > >>>>>> aren't naming the least boring stuff to discuss and that takes the > >>>>>> most of the time for KIPs :P): > >>>>>> > >>>>>> 1. I tend to agree with Sophie regarding whether or not to include > >>>>>> "Standby" in the functions of "onStandbyUpdateStart/Suspended", > since > >>>>>> it is also more consistent with the functions of > >>>>>> "StateRestoreListener" where we do not name it as > >>>>>> "onStateRestoreState" etc. > >>>>>> > >>>>>> 2. I know in community discussions we sometimes say "a standby is > >>>>>> promoted to active", but in the official code / java docs we did not > >>>>>> have a term of "promotion", since what the code does is really > >>> recycle > >>>>>> the task (while keeping its state stores open), and create a new > >>>>>> active task that takes in the recycled state stores and just > changing > >>>>>> the other fields like task type etc. After thinking about this for a > >>>>>> bit, I tend to feel that "promoted" is indeed a better name for user > >>>>>> facing purposes while "recycle" is more of a technical detail inside > >>>>>> the code and could be abstracted away from users. So I feel keeping > >>>>>> the name "PROMOTED" is fine. > >>>>>> > >>>>>> 3. Regarding "earliestOffset", it does feel like we cannot always > >>>>>> avoid another call to the Kafka API. And on the other hand, I also > >>>>>> tend to think that such bookkeeping may be better done at the app > >>>>>> level than from the Streams' public API level. I.e. the app could > >>> keep > >>>>>> a "first ever starting offset" per "topic-partition-store" key, and > a > >>>>>> when we have rolling restart and hence some standby task keeps > >>>>>> "jumping" from one client to another via task assignment, the app > >>>>>> would update this value just one when it finds the > >>>>>> ""topic-partition-store" was never triggered before. What do you > >>>>>> think? > >>>>>> > >>>>>> 4. I do not have a strong opinion either, but what about > >>>>> "onBatchUpdated" ? > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io> > >>>>> wrote: > >>>>>>> > >>>>>>> Sohpie— > >>>>>>> > >>>>>>> Thank you very much for such a detailed review of the KIP. It might > >>>>>>> actually be longer than the original KIP in the first place! > >>>>>>> > >>>>>>> 1. Ack'ed and fixed. > >>>>>>> > >>>>>>> 2. Correct, this is a confusing passage and requires context: > >>>>>>> > >>>>>>> One thing on our list of TODO's regarding reliability is to > >>> determine > >>>>> how > >>>>>>> to configure `session.timeout.ms`. In our Kubernetes Environment, > >>> an > >>>>>>> instance of our Streams App can be terminated, restarted, and get > >>> back > >>>>>> into > >>>>>>> the "RUNNING" Streams state in about 20 seconds. We have two > >>> options > >>>>>> here: > >>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal with 20 > >>>>> seconds > >>>>>> of > >>>>>>> unavailability for affected partitions, but avoid shuffling Tasks; > >>> or > >>>>> b) > >>>>>>> set session.timeout.ms to a low value, such as 6 seconds ( > >>>>>>> heartbeat.interval.ms of 2000), and reduce the unavailability > >>> window > >>>>>> during > >>>>>>> a rolling bounce but incur an "extra" rebalance. There are several > >>>>>>> different costs to a rebalance, including the shuffling of standby > >>>>> tasks. > >>>>>>> JMX metrics are not fine-grained enough to give us an accurate > >>> picture > >>>>> of > >>>>>>> what's going on with the whole Standby Task Shuffle Dance. I > >>>>> hypothesize > >>>>>>> that the Standby Update Listener might help us clarify just how the > >>>>>>> shuffling actually (not theoretically) works, which will help us > >>> make a > >>>>>>> more informed decision about the session timeout config. > >>>>>>> > >>>>>>> If you think this is worth putting in the KIP, I'll polish it and > >>> do > >>>>> so; > >>>>>>> else, I'll remove the current half-baked explanation. > >>>>>>> > >>>>>>> 3. Overall, I agree with this. In our app, each Task has only one > >>> Store > >>>>>> to > >>>>>>> reduce the number of changelog partitions, so I sometimes forget > >>> the > >>>>>>> distinction between the two concepts, as reflected in the KIP (: > >>>>>>> > >>>>>>> 3a. I don't like the word "Restore" here, since Restoration refers > >>> to > >>>>> an > >>>>>>> Active Task getting caught up in preparation to resume processing. > >>>>>>> `StandbyUpdateListener` is fine by me; I have updated the KIP. I > >>> am a > >>>>>>> native Python speaker so I do prefer shorter names anyways (: > >>>>>>> > >>>>>>> 3b1. +1 to removing the word 'Task'. > >>>>>>> > >>>>>>> 3b2. I like `onUpdateStart()`, but with your permission I'd prefer > >>>>>>> `onStandbyUpdateStart()` which matches the name of the Interface > >>>>>>> "StandbyUpdateListener". (the python part of me hates this, > >>> however) > >>>>>>> > >>>>>>> 3b3. Going back to question 2), `earliestOffset` was intended to > >>> allow > >>>>> us > >>>>>>> to more easily calculate the amount of state _already loaded_ in > >>> the > >>>>>> store > >>>>>>> by subtracting (startingOffset - earliestOffset). This would help > >>> us > >>>>> see > >>>>>>> how much inefficiency is introduced in a rolling restart—if we end > >>> up > >>>>>> going > >>>>>>> from a situation with an up-to-date standby before the restart, and > >>>>> then > >>>>>>> after the whole restart, the Task is shuffled onto an instance > >>> where > >>>>>> there > >>>>>>> is no previous state, then that is expensive. However, if the final > >>>>>>> shuffling results in the Task back on an instance with a lot of > >>>>> pre-built > >>>>>>> state, it's not expensive. > >>>>>>> > >>>>>>> If a call over the network is required to determine the > >>> earliestOffset, > >>>>>>> then this is a "hard no-go" for me, and we will remove it (I'll > >>> have to > >>>>>>> check with Eduwer as he is close to having a working > >>> implementation). I > >>>>>>> think we can probably determine what we wanted to see in a > >>> different > >>>>>>> way, but it will take more thinking.. If `earliestOffset` is > >>> confusing, > >>>>>>> perhaps rename it to `earliestChangelogOffset`? > >>>>>>> > >>>>>>> `startingOffset` is easy to remove as it can be determined from the > >>>>> first > >>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`. > >>>>>>> > >>>>>>> Anyways, I've updated the JavaDoc in the interface; hopefully it's > >>> more > >>>>>>> clear. Awaiting further instructions here. > >>>>>>> > >>>>>>> 3c. Good point; after thinking, my preference is > >>> `onBatchLoaded()` -> > >>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. > >>> I am > >>>>>>> less fond of "processed" because when I was first learning Streams > >>> I > >>>>>>> mistakenly thought that standby tasks actually processed the input > >>>>> topic > >>>>>>> rather than loaded from the changelog. I'll defer to you here. > >>>>>>> > >>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet > >>>>>>> `onStandbyUpdateSuspended()`. Will check about the implementation > >>> of > >>>>>>> keeping track of the number of records loaded. > >>>>>>> > >>>>>>> 4a. I think this might be best in a separate KIP, especially given > >>> that > >>>>>>> this is my and Eduwer's first time contributing to Kafka (so we > >>> want to > >>>>>>> minimize the blast radius). > >>>>>>> > >>>>>>> 4b. I might respectfully (and timidly) push back here, RECYCLED > >>> for an > >>>>>>> Active Task is a bit confusing to me. DEMOTED and MIGRATED make > >>> sense > >>>>>> from > >>>>>>> the standpoint of an Active Task, recycling to me sounds like > >>> throwing > >>>>>>> stuff away, such that the resources (i.e. disk space) can be used > >>> by a > >>>>>>> separate Task. As an alternative rather than trying to reuse the > >>> same > >>>>>> enum, > >>>>>>> maybe rename it to `StandbySuspendReason` to avoid naming conflicts > >>>>> with > >>>>>>> `ActiveSuspendReason`? However, I could be convinced to rename > >>> PROMOTED > >>>>>> -> > >>>>>>> RECYCLED, especially if Eduwer agrees. > >>>>>>> > >>>>>>> TLDR: > >>>>>>> > >>>>>>> T1. Agreed, will remove the word "Task" as it's incorrect. > >>>>>>> T2. Will update to `onStandbyUpdateStart()` > >>>>>>> T3. Awaiting further instructions on earliestOffset and > >>> startingOffset. > >>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps > >>>>>> `onBatchLoaded()`? > >>>>>>> T5. Will update to `onStandbyUpdateSuspended()` > >>>>>>> T6. Thoughts on renaming SuspendReason to StandbySuspendReason, > >>> rather > >>>>>> than > >>>>>>> renaming PROMOTED to RECYCLED? @Eduwer? > >>>>>>> > >>>>>>> Long Live the Otter, > >>>>>>> Colt McNealy > >>>>>>> > >>>>>>> *Founder, LittleHorse.dev* > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman < > >>>>>> sop...@responsive.dev> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great addition to > >>>>>> Streams, I > >>>>>>>> can't believe we've gone so long without this. > >>>>>>>> > >>>>>>>> Overall the proposal makes sense, but I had a handful of fairly > >>> minor > >>>>>>>> questions and suggestions/requests > >>>>>>>> > >>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of the > >>>>> Motivation > >>>>>>>> section is cut off and incomplete -- "want to be able to know " > >>> what > >>>>>>>> exactly? > >>>>>>>> > >>>>>>>> 2. This isn't that important since the motivation as a whole is > >>> clear > >>>>>> to me > >>>>>>>> and convincing enough, but I'm not quite sure I understand the > >>>>> example > >>>>>> at > >>>>>>>> the end of the Motivation section. How are standby tasks (and the > >>>>>> ability > >>>>>>>> to hook into and monitor their status) related to the > >>>>>> session.timeout.ms > >>>>>>>> config? > >>>>>>>> > >>>>>>>> 3. To help both old and new users of Kafka Streams understand > >>> this > >>>>> new > >>>>>>>> restore listener and its purpose/semantics, can we try to name > >>> the > >>>>>> class > >>>>>>>> and > >>>>>>>> callbacks in a way that's more consistent with the active task > >>>>> restore > >>>>>>>> listener? > >>>>>>>> > >>>>>>>> 3a. StandbyTaskUpdateListener: > >>>>>>>> The existing restore listener is called StateRestoreListener, so > >>> the > >>>>>> new > >>>>>>>> one could be called something like StandbyStateRestoreListener. > >>>>>> Although > >>>>>>>> we typically refer to standby tasks as "processing" rather than > >>>>>> "restoring" > >>>>>>>> records -- ie restoration is a term for active task state > >>>>>> specifically. I > >>>>>>>> actually > >>>>>>>> like the original suggestion if we just drop the "Task" part of > >>> the > >>>>>> name, > >>>>>>>> ie StandbyUpdateListener. I think either that or > >>>>> StandbyRestoreListener > >>>>>>>> would be fine and probably the two best options. > >>>>>>>> Also, this probably goes without saying but any change to the > >>> name of > >>>>>> this > >>>>>>>> class should of course be reflected in the KafkaStreams#setXXX > >>> API as > >>>>>> well > >>>>>>>> > >>>>>>>> 3b. #onTaskCreated > >>>>>>>> I know the "start" callback feels a bit different for the > >>> standby > >>>>> task > >>>>>>>> updater vs an active task beginning restoration, but I think we > >>>>> should > >>>>>> try > >>>>>>>> to > >>>>>>>> keep the various callbacks aligned to their active restore > >>> listener > >>>>>>>> counterpart. We can/should just replace the term "restore" with > >>>>>> "update" > >>>>>>>> for the > >>>>>>>> callback method names the same way we do for the class name, > >>> which in > >>>>>> this > >>>>>>>> case would give us #onUpdateStart. Personally I like this better, > >>>>>>>> but it's ultimately up to you. However, I would push back against > >>>>>> anything > >>>>>>>> that includes the word "Task" (eg #onTaskCreated) as the listener > >>>>>>>> is actually not scoped to the task itself but instead to the > >>>>>> individual > >>>>>>>> state store(s). This is the main reason I would prefer calling it > >>>>>> something > >>>>>>>> like #onUpdateStart, which keeps the focus on the store being > >>> updated > >>>>>>>> rather than the task that just happens to own this store > >>>>>>>> One last thing on this callback -- do we really need both the > >>>>>>>> `earliestOffset` and `startingOffset`? I feel like this might be > >>> more > >>>>>>>> confusing than it > >>>>>>>> is helpful (tbh even I'm not completely sure I know what the > >>>>>> earliestOffset > >>>>>>>> is supposed to represent) More importantly, is this all > >>> information > >>>>>>>> that is already available and able to be passed in to the > >>> callback by > >>>>>>>> Streams? I haven't checked on this but it feels like the > >>>>>> earliestOffset is > >>>>>>>> likely to require a remote call, either by the embedded consumer > >>> or > >>>>>> via the > >>>>>>>> admin client. If so, the ROI on including this parameter seems > >>>>>>>> quite low (if not outright negative) > >>>>>>>> > >>>>>>>> 3c. #onBatchRestored > >>>>>>>> If we opt to use the term "update" in place of "restore" > >>> elsewhere, > >>>>>> then we > >>>>>>>> should consider doing so here as well. What do you think about > >>>>>>>> #onBatchUpdated, or even #onBatchProcessed? > >>>>>>>> I'm actually not super concerned about this particular API, and > >>>>>> honestly I > >>>>>>>> think we can use restore or update interchangeably here, so if > >>> you > >>>>>>>> don't like any of the suggested names (and no one can think of > >>>>>> anything > >>>>>>>> better), I would just stick with #onBatchRestored. In this case, > >>>>>>>> it kind of makes the most sense. > >>>>>>>> > >>>>>>>> 3d. #onTaskSuspended > >>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or just > >>>>>>>> #onRestoreSuspended probably makes more sense for this callback. > >>>>> Also, > >>>>>>>> I notice the StateRestoreListener passes in the total number of > >>>>>> records > >>>>>>>> restored to its #onRestoreSuspended. Assuming we already track > >>>>>>>> that information in Streams and have it readily available to > >>> pass in > >>>>> at > >>>>>>>> whatever point we would be invoking this callback, that might be > >>> a > >>>>>>>> useful parameter for the standby listener to have as well > >>>>>>>> > >>>>>>>> 4. I totally love the SuspendReason thing, just two > >>> notes/requests: > >>>>>>>> > >>>>>>>> 4a. Feel free to push back against adding onto the scope of this > >>> KIP, > >>>>>> but > >>>>>>>> it would be great to expand the active state restore listener > >>> with > >>>>> this > >>>>>>>> SuspendReason enum as well. It would be really useful for both > >>>>>> variants of > >>>>>>>> restore listener > >>>>>>>> > >>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for > >>>>> standby > >>>>>>>> tasks it means basically the same thing, the point is that active > >>>>>>>> tasks can also be recycled into standbys through the same > >>> mechanism. > >>>>>> This > >>>>>>>> way they can share the SuspendReason enum -- not that it's > >>>>>>>> necessary for them to share, I just think it would be a good > >>> idea to > >>>>>> keep > >>>>>>>> the two restore listeners aligned to the highest degree possible > >>> for > >>>>> as > >>>>>>>> we can. > >>>>>>>> I was actually considering proposing a short KIP with a new > >>>>>>>> RecyclingListener (or something) specifically for this exact > >>> kind of > >>>>>> thing, > >>>>>>>> since we > >>>>>>>> currently have literally zero insight into the recycling process. > >>>>> It's > >>>>>>>> practically impossible to tell when a store has been converted > >>> from > >>>>>> active > >>>>>>>> to > >>>>>>>> standby, or vice versa. So having access to the SuspendReason, > >>> and > >>>>> more > >>>>>>>> importantly having a callback guaranteed to notify you when a > >>>>>>>> state store is recycled whether active or standby, would be > >>> amazing. > >>>>>>>> > >>>>>>>> Thanks for the KIP! > >>>>>>>> > >>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman > >>>>>>>> > >>>>>>>> > >>>>>>>> ---------- Forwarded message --------- > >>>>>>>>> From: Colt McNealy <c...@littlehorse.io> > >>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM > >>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener > >>>>>>>>> To: <dev@kafka.apache.org> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> We would like to propose a small KIP to improve the ability of > >>>>>> Streams > >>>>>>>> apps > >>>>>>>>> to monitor the progress of their standby tasks through a > >>> callback > >>>>>>>>> interface. > >>>>>>>>> > >>>>>>>>> We have a nearly-working implementation on our fork and are > >>> curious > >>>>>> for > >>>>>>>>> feedback. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener > >>>>>>>>> > >>>>>>>>> Thank you, > >>>>>>>>> Colt McNealy > >>>>>>>>> > >>>>>>>>> *Founder, LittleHorse.dev* > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>> >