Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth going on,
so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do similarly
for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well, mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io>
wrote:

> Hello everyone,
>
> Thanks for all your feedback for this KIP!
>
> I think that the key to choosing proper names for this API is understanding
> the terms used inside the StoreChangelogReader. Currently, this class has
> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my opinion,
> using StandbyUpdateListener for the interface fits better on these terms.
> Same applies for onUpdateStart/Suspended.
>
> StoreChangelogReader uses "the same mechanism" for active task restoration
> and standby task updates, but this is an implementation detail. Under
> normal circumstances (no rebalances or task migrations), the changelog
> reader will be in STANDBY_UPDATING, which means it will be updating standby
> tasks as long as there are new records in the changelog topic. That's why I
> prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't 100%
> align with StateRestoreListener, but either one is fine.
>
> Edu
>
> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <guozhang.wang...@gmail.com>
> wrote:
>
> > Hello Colt,
> >
> > Thanks for writing the KIP! I have read through the updated KIP and
> > overall it looks great. I only have minor naming comments (well,
> > aren't naming the least boring stuff to discuss and that takes the
> > most of the time for KIPs :P):
> >
> > 1. I tend to agree with Sophie regarding whether or not to include
> > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > it is also more consistent with the functions of
> > "StateRestoreListener" where we do not name it as
> > "onStateRestoreState" etc.
> >
> > 2. I know in community discussions we sometimes say "a standby is
> > promoted to active", but in the official code / java docs we did not
> > have a term of "promotion", since what the code does is really recycle
> > the task (while keeping its state stores open), and create a new
> > active task that takes in the recycled state stores and just changing
> > the other fields like task type etc. After thinking about this for a
> > bit, I tend to feel that "promoted" is indeed a better name for user
> > facing purposes while "recycle" is more of a technical detail inside
> > the code and could be abstracted away from users. So I feel keeping
> > the name "PROMOTED" is fine.
> >
> > 3. Regarding "earliestOffset", it does feel like we cannot always
> > avoid another call to the Kafka API. And on the other hand, I also
> > tend to think that such bookkeeping may be better done at the app
> > level than from the Streams' public API level. I.e. the app could keep
> > a "first ever starting offset" per "topic-partition-store" key, and a
> > when we have rolling restart and hence some standby task keeps
> > "jumping" from one client to another via task assignment, the app
> > would update this value just one when it finds the
> > ""topic-partition-store" was never triggered before. What do you
> > think?
> >
> > 4. I do not have a strong opinion either, but what about
> "onBatchUpdated" ?
> >
> >
> > Guozhang
> >
> > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
> wrote:
> > >
> > > Sohpie—
> > >
> > > Thank you very much for such a detailed review of the KIP. It might
> > > actually be longer than the original KIP in the first place!
> > >
> > > 1. Ack'ed and fixed.
> > >
> > > 2. Correct, this is a confusing passage and requires context:
> > >
> > > One thing on our list of TODO's regarding reliability is to determine
> how
> > > to configure `session.timeout.ms`. In our Kubernetes Environment, an
> > > instance of our Streams App can be terminated, restarted, and get back
> > into
> > > the "RUNNING" Streams state in about 20 seconds. We have two options
> > here:
> > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> seconds
> > of
> > > unavailability for affected partitions, but avoid shuffling Tasks; or
> b)
> > > set session.timeout.ms to a low value, such as 6 seconds (
> > > heartbeat.interval.ms of 2000), and reduce the unavailability window
> > during
> > > a rolling bounce but incur an "extra" rebalance. There are several
> > > different costs to a rebalance, including the shuffling of standby
> tasks.
> > > JMX metrics are not fine-grained enough to give us an accurate picture
> of
> > > what's going on with the whole Standby Task Shuffle Dance. I
> hypothesize
> > > that the Standby Update Listener might help us clarify just how the
> > > shuffling actually (not theoretically) works, which will help us make a
> > > more informed decision about the session timeout config.
> > >
> > > If you think this is worth putting in the KIP, I'll polish it and do
> so;
> > > else, I'll remove the current half-baked explanation.
> > >
> > > 3. Overall, I agree with this. In our app, each Task has only one Store
> > to
> > > reduce the number of changelog partitions, so I sometimes forget the
> > > distinction between the two concepts, as reflected in the KIP (:
> > >
> > > 3a. I don't like the word "Restore" here, since Restoration refers to
> an
> > > Active Task getting caught up in preparation to resume processing.
> > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I am a
> > > native Python speaker so I do prefer shorter names anyways (:
> > >
> > > 3b1. +1 to removing the word 'Task'.
> > >
> > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> > > `onStandbyUpdateStart()` which matches the name of the Interface
> > > "StandbyUpdateListener". (the python part of me hates this, however)
> > >
> > > 3b3. Going back to question 2), `earliestOffset` was intended to allow
> us
> > > to more easily calculate the amount of state _already loaded_ in the
> > store
> > > by subtracting (startingOffset - earliestOffset). This would help us
> see
> > > how much inefficiency is introduced in a rolling restart—if we end up
> > going
> > > from a situation with an up-to-date standby before the restart, and
> then
> > > after the whole restart, the Task is shuffled onto an instance where
> > there
> > > is no previous state, then that is expensive. However, if the final
> > > shuffling results in the Task back on an instance with a lot of
> pre-built
> > > state, it's not expensive.
> > >
> > > If a call over the network is required to determine the earliestOffset,
> > > then this is a "hard no-go" for me, and we will remove it (I'll have to
> > > check with Eduwer as he is close to having a working implementation). I
> > > think we can probably determine what we wanted to see in a different
> > > way, but it will take more thinking.. If `earliestOffset` is confusing,
> > > perhaps rename it to `earliestChangelogOffset`?
> > >
> > > `startingOffset` is easy to remove as it can be determined from the
> first
> > > call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> > >
> > > Anyways, I've updated the JavaDoc in the interface; hopefully it's more
> > > clear. Awaiting further instructions here.
> > >
> > > 3c. Good point; after thinking, my preference is `onBatchLoaded()`  ->
> > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`. I am
> > > less fond of "processed" because when I was first learning Streams I
> > > mistakenly thought that standby tasks actually processed the input
> topic
> > > rather than loaded from the changelog. I'll defer to you here.
> > >
> > > 3d. +1 to `onUpdateSuspended()`, or better yet
> > > `onStandbyUpdateSuspended()`. Will check about the implementation of
> > > keeping track of the number of records loaded.
> > >
> > > 4a. I think this might be best in a separate KIP, especially given that
> > > this is my and Eduwer's first time contributing to Kafka (so we want to
> > > minimize the blast radius).
> > >
> > > 4b. I might respectfully (and timidly) push back here, RECYCLED for an
> > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make sense
> > from
> > > the standpoint of an Active Task, recycling to me sounds like throwing
> > > stuff away, such that the resources (i.e. disk space) can be used by a
> > > separate Task. As an alternative rather than trying to reuse the same
> > enum,
> > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts
> with
> > > `ActiveSuspendReason`? However, I could be convinced to rename PROMOTED
> > ->
> > > RECYCLED, especially if Eduwer agrees.
> > >
> > > TLDR:
> > >
> > > T1. Agreed, will remove the word "Task" as it's incorrect.
> > > T2. Will update to `onStandbyUpdateStart()`
> > > T3. Awaiting further instructions on earliestOffset and startingOffset.
> > > T4. I don't like `onBatchProcessed()` too much, perhaps
> > `onBatchLoaded()`?
> > > T5. Will update to `onStandbyUpdateSuspended()`
> > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason, rather
> > than
> > > renaming PROMOTED to RECYCLED? @Eduwer?
> > >
> > > Long Live the Otter,
> > > Colt McNealy
> > >
> > > *Founder, LittleHorse.dev*
> > >
> > >
> > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> > sop...@responsive.dev>
> > > wrote:
> > >
> > > > Hey Colt! Thanks for the KIP -- this will be a great addition to
> > Streams, I
> > > > can't believe we've gone so long without this.
> > > >
> > > > Overall the proposal makes sense, but I had a handful of fairly minor
> > > > questions and suggestions/requests
> > > >
> > > > 1. Seems like the last sentence in the 2nd paragraph of the
> Motivation
> > > > section is cut off and incomplete -- "want to be able to know " what
> > > > exactly?
> > > >
> > > > 2. This isn't that important since the motivation as a whole is clear
> > to me
> > > > and convincing enough, but I'm not quite sure I understand the
> example
> > at
> > > > the end of the Motivation section. How are standby tasks (and the
> > ability
> > > > to hook into and monitor their status) related to the
> > session.timeout.ms
> > > > config?
> > > >
> > > > 3. To help both old and new users of Kafka Streams understand this
> new
> > > > restore listener and its purpose/semantics, can we try to name the
> > class
> > > > and
> > > >  callbacks in a way that's more consistent with the active task
> restore
> > > > listener?
> > > >
> > > > 3a. StandbyTaskUpdateListener:
> > > > The existing restore listener is called StateRestoreListener, so the
> > new
> > > > one could be called something like StandbyStateRestoreListener.
> > Although
> > > > we typically refer to standby tasks as "processing" rather than
> > "restoring"
> > > > records -- ie restoration is a term for active task state
> > specifically. I
> > > > actually
> > > > like the original suggestion if we just drop the "Task" part of the
> > name,
> > > > ie StandbyUpdateListener. I think either that or
> StandbyRestoreListener
> > > > would be fine and probably the two best options.
> > > > Also, this probably goes without saying but any change to the name of
> > this
> > > > class should of course be reflected in the KafkaStreams#setXXX API as
> > well
> > > >
> > > > 3b. #onTaskCreated
> > > >  I know the "start" callback feels a bit different for the standby
> task
> > > > updater vs an active task beginning restoration, but I think we
> should
> > try
> > > > to
> > > > keep the various callbacks aligned to their active restore listener
> > > > counterpart. We can/should just replace the term "restore" with
> > "update"
> > > > for the
> > > > callback method names the same way we do for the class name, which in
> > this
> > > > case would give us #onUpdateStart. Personally I like this better,
> > > > but it's ultimately up to you. However, I would push back against
> > anything
> > > > that includes the word "Task" (eg #onTaskCreated) as the listener
> > > >  is actually not scoped to the task itself but instead to the
> > individual
> > > > state store(s). This is the main reason I would prefer calling it
> > something
> > > > like #onUpdateStart, which keeps the focus on the store being updated
> > > > rather than the task that just happens to own this store
> > > > One last thing on this callback -- do we really need both the
> > > > `earliestOffset` and `startingOffset`? I feel like this might be more
> > > > confusing than it
> > > > is helpful (tbh even I'm not completely sure I know what the
> > earliestOffset
> > > > is supposed to represent) More importantly, is this all information
> > > > that is already available and able to be passed in to the callback by
> > > > Streams? I haven't checked on this but it feels like the
> > earliestOffset is
> > > > likely to require a remote call, either by the embedded consumer or
> > via the
> > > > admin client. If so, the ROI on including this parameter seems
> > > > quite low (if not outright negative)
> > > >
> > > > 3c. #onBatchRestored
> > > > If we opt to use the term "update" in place of "restore" elsewhere,
> > then we
> > > > should consider doing so here as well. What do you think about
> > > > #onBatchUpdated, or even #onBatchProcessed?
> > > > I'm actually not super concerned about this particular API, and
> > honestly I
> > > > think we can use restore or update interchangeably here, so if you
> > > >  don't like any of the suggested names (and no one can think of
> > anything
> > > > better), I would just stick with #onBatchRestored. In this case,
> > > > it kind of makes the most sense.
> > > >
> > > > 3d. #onTaskSuspended
> > > > Along the same lines as 3b above, #onUpdateSuspended or just
> > > > #onRestoreSuspended probably makes more sense for this callback.
> Also,
> > > >  I notice the StateRestoreListener passes in the total number of
> > records
> > > > restored to its #onRestoreSuspended. Assuming we already track
> > > > that information in Streams and have it readily available to pass in
> at
> > > > whatever point we would be invoking this callback, that might be a
> > > > useful  parameter for the standby listener to have as well
> > > >
> > > > 4. I totally love the SuspendReason thing, just two notes/requests:
> > > >
> > > > 4a. Feel free to push back against adding onto the scope of this KIP,
> > but
> > > > it would be great to expand the active state restore listener with
> this
> > > > SuspendReason enum as well. It would be really useful for both
> > variants of
> > > > restore listener
> > > >
> > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
> standby
> > > > tasks it means basically the same thing, the point is that active
> > > > tasks can also be recycled into standbys through the same mechanism.
> > This
> > > > way they can share the SuspendReason enum -- not that it's
> > > > necessary for them to share, I just think it would be a good idea to
> > keep
> > > > the two restore listeners aligned to the highest degree possible for
> as
> > > > we can.
> > > > I was actually considering proposing a short KIP with a new
> > > > RecyclingListener (or something) specifically for this exact kind of
> > thing,
> > > > since we
> > > > currently have literally zero insight into the recycling process.
> It's
> > > > practically impossible to tell when a store has been converted from
> > active
> > > > to
> > > > standby, or vice versa. So having access to the SuspendReason, and
> more
> > > > importantly having a callback guaranteed to notify you when a
> > > > state store is recycled whether active or standby, would be amazing.
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> > > >
> > > >
> > > > ---------- Forwarded message ---------
> > > > > From: Colt McNealy <c...@littlehorse.io>
> > > > > Date: Tue, Oct 3, 2023 at 12:48 PM
> > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
> > > > > To: <dev@kafka.apache.org>
> > > > >
> > > > >
> > > > > Hi all,
> > > > >
> > > > > We would like to propose a small KIP to improve the ability of
> > Streams
> > > > apps
> > > > > to monitor the progress of their standby tasks through a callback
> > > > > interface.
> > > > >
> > > > > We have a nearly-working implementation on our fork and are curious
> > for
> > > > > feedback.
> > > > >
> > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> > > > >
> > > > > Thank you,
> > > > > Colt McNealy
> > > > >
> > > > > *Founder, LittleHorse.dev*
> > > > >
> > > >
> >
>

Reply via email to