Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <guozhang.wang...@gmail.com>
wrote:

> Thanks for the summary, that looks good to me.
>
> Guozhang
>
> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> wrote:
> >
> > Hello there!
> >
> > Thanks everyone for the comments. There's a lot of back-and-forth going
> on,
> > so I'll do my best to summarize what everyone's said in TLDR format:
> >
> > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> similarly
> > for the other methods.
> > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> > 3. Remove the `earliestOffset` parameter for performance reasons.
> >
> > If that's all fine with everyone, I'll update the KIP and we—well, mostly
> > Edu (:  —will open a PR.
> >
> > Cheers,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*
> >
> >
> > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io>
> > wrote:
> >
> > > Hello everyone,
> > >
> > > Thanks for all your feedback for this KIP!
> > >
> > > I think that the key to choosing proper names for this API is
> understanding
> > > the terms used inside the StoreChangelogReader. Currently, this class
> has
> > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> opinion,
> > > using StandbyUpdateListener for the interface fits better on these
> terms.
> > > Same applies for onUpdateStart/Suspended.
> > >
> > > StoreChangelogReader uses "the same mechanism" for active task
> restoration
> > > and standby task updates, but this is an implementation detail. Under
> > > normal circumstances (no rebalances or task migrations), the changelog
> > > reader will be in STANDBY_UPDATING, which means it will be updating
> standby
> > > tasks as long as there are new records in the changelog topic. That's
> why I
> > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> 100%
> > > align with StateRestoreListener, but either one is fine.
> > >
> > > Edu
> > >
> > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> guozhang.wang...@gmail.com>
> > > wrote:
> > >
> > > > Hello Colt,
> > > >
> > > > Thanks for writing the KIP! I have read through the updated KIP and
> > > > overall it looks great. I only have minor naming comments (well,
> > > > aren't naming the least boring stuff to discuss and that takes the
> > > > most of the time for KIPs :P):
> > > >
> > > > 1. I tend to agree with Sophie regarding whether or not to include
> > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > > it is also more consistent with the functions of
> > > > "StateRestoreListener" where we do not name it as
> > > > "onStateRestoreState" etc.
> > > >
> > > > 2. I know in community discussions we sometimes say "a standby is
> > > > promoted to active", but in the official code / java docs we did not
> > > > have a term of "promotion", since what the code does is really
> recycle
> > > > the task (while keeping its state stores open), and create a new
> > > > active task that takes in the recycled state stores and just changing
> > > > the other fields like task type etc. After thinking about this for a
> > > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > > facing purposes while "recycle" is more of a technical detail inside
> > > > the code and could be abstracted away from users. So I feel keeping
> > > > the name "PROMOTED" is fine.
> > > >
> > > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > > avoid another call to the Kafka API. And on the other hand, I also
> > > > tend to think that such bookkeeping may be better done at the app
> > > > level than from the Streams' public API level. I.e. the app could
> keep
> > > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > > when we have rolling restart and hence some standby task keeps
> > > > "jumping" from one client to another via task assignment, the app
> > > > would update this value just one when it finds the
> > > > ""topic-partition-store" was never triggered before. What do you
> > > > think?
> > > >
> > > > 4. I do not have a strong opinion either, but what about
> > > "onBatchUpdated" ?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
> > > wrote:
> > > > >
> > > > > Sohpie—
> > > > >
> > > > > Thank you very much for such a detailed review of the KIP. It might
> > > > > actually be longer than the original KIP in the first place!
> > > > >
> > > > > 1. Ack'ed and fixed.
> > > > >
> > > > > 2. Correct, this is a confusing passage and requires context:
> > > > >
> > > > > One thing on our list of TODO's regarding reliability is to
> determine
> > > how
> > > > > to configure `session.timeout.ms`. In our Kubernetes Environment,
> an
> > > > > instance of our Streams App can be terminated, restarted, and get
> back
> > > > into
> > > > > the "RUNNING" Streams state in about 20 seconds. We have two
> options
> > > > here:
> > > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> > > seconds
> > > > of
> > > > > unavailability for affected partitions, but avoid shuffling Tasks;
> or
> > > b)
> > > > > set session.timeout.ms to a low value, such as 6 seconds (
> > > > > heartbeat.interval.ms of 2000), and reduce the unavailability
> window
> > > > during
> > > > > a rolling bounce but incur an "extra" rebalance. There are several
> > > > > different costs to a rebalance, including the shuffling of standby
> > > tasks.
> > > > > JMX metrics are not fine-grained enough to give us an accurate
> picture
> > > of
> > > > > what's going on with the whole Standby Task Shuffle Dance. I
> > > hypothesize
> > > > > that the Standby Update Listener might help us clarify just how the
> > > > > shuffling actually (not theoretically) works, which will help us
> make a
> > > > > more informed decision about the session timeout config.
> > > > >
> > > > > If you think this is worth putting in the KIP, I'll polish it and
> do
> > > so;
> > > > > else, I'll remove the current half-baked explanation.
> > > > >
> > > > > 3. Overall, I agree with this. In our app, each Task has only one
> Store
> > > > to
> > > > > reduce the number of changelog partitions, so I sometimes forget
> the
> > > > > distinction between the two concepts, as reflected in the KIP (:
> > > > >
> > > > > 3a. I don't like the word "Restore" here, since Restoration refers
> to
> > > an
> > > > > Active Task getting caught up in preparation to resume processing.
> > > > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I
> am a
> > > > > native Python speaker so I do prefer shorter names anyways (:
> > > > >
> > > > > 3b1. +1 to removing the word 'Task'.
> > > > >
> > > > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> > > > > `onStandbyUpdateStart()` which matches the name of the Interface
> > > > > "StandbyUpdateListener". (the python part of me hates this,
> however)
> > > > >
> > > > > 3b3. Going back to question 2), `earliestOffset` was intended to
> allow
> > > us
> > > > > to more easily calculate the amount of state _already loaded_ in
> the
> > > > store
> > > > > by subtracting (startingOffset - earliestOffset). This would help
> us
> > > see
> > > > > how much inefficiency is introduced in a rolling restart—if we end
> up
> > > > going
> > > > > from a situation with an up-to-date standby before the restart, and
> > > then
> > > > > after the whole restart, the Task is shuffled onto an instance
> where
> > > > there
> > > > > is no previous state, then that is expensive. However, if the final
> > > > > shuffling results in the Task back on an instance with a lot of
> > > pre-built
> > > > > state, it's not expensive.
> > > > >
> > > > > If a call over the network is required to determine the
> earliestOffset,
> > > > > then this is a "hard no-go" for me, and we will remove it (I'll
> have to
> > > > > check with Eduwer as he is close to having a working
> implementation). I
> > > > > think we can probably determine what we wanted to see in a
> different
> > > > > way, but it will take more thinking.. If `earliestOffset` is
> confusing,
> > > > > perhaps rename it to `earliestChangelogOffset`?
> > > > >
> > > > > `startingOffset` is easy to remove as it can be determined from the
> > > first
> > > > > call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> > > > >
> > > > > Anyways, I've updated the JavaDoc in the interface; hopefully it's
> more
> > > > > clear. Awaiting further instructions here.
> > > > >
> > > > > 3c. Good point; after thinking, my preference is
> `onBatchLoaded()`  ->
> > > > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`.
> I am
> > > > > less fond of "processed" because when I was first learning Streams
> I
> > > > > mistakenly thought that standby tasks actually processed the input
> > > topic
> > > > > rather than loaded from the changelog. I'll defer to you here.
> > > > >
> > > > > 3d. +1 to `onUpdateSuspended()`, or better yet
> > > > > `onStandbyUpdateSuspended()`. Will check about the implementation
> of
> > > > > keeping track of the number of records loaded.
> > > > >
> > > > > 4a. I think this might be best in a separate KIP, especially given
> that
> > > > > this is my and Eduwer's first time contributing to Kafka (so we
> want to
> > > > > minimize the blast radius).
> > > > >
> > > > > 4b. I might respectfully (and timidly) push back here, RECYCLED
> for an
> > > > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make
> sense
> > > > from
> > > > > the standpoint of an Active Task, recycling to me sounds like
> throwing
> > > > > stuff away, such that the resources (i.e. disk space) can be used
> by a
> > > > > separate Task. As an alternative rather than trying to reuse the
> same
> > > > enum,
> > > > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts
> > > with
> > > > > `ActiveSuspendReason`? However, I could be convinced to rename
> PROMOTED
> > > > ->
> > > > > RECYCLED, especially if Eduwer agrees.
> > > > >
> > > > > TLDR:
> > > > >
> > > > > T1. Agreed, will remove the word "Task" as it's incorrect.
> > > > > T2. Will update to `onStandbyUpdateStart()`
> > > > > T3. Awaiting further instructions on earliestOffset and
> startingOffset.
> > > > > T4. I don't like `onBatchProcessed()` too much, perhaps
> > > > `onBatchLoaded()`?
> > > > > T5. Will update to `onStandbyUpdateSuspended()`
> > > > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason,
> rather
> > > > than
> > > > > renaming PROMOTED to RECYCLED? @Eduwer?
> > > > >
> > > > > Long Live the Otter,
> > > > > Colt McNealy
> > > > >
> > > > > *Founder, LittleHorse.dev*
> > > > >
> > > > >
> > > > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> > > > sop...@responsive.dev>
> > > > > wrote:
> > > > >
> > > > > > Hey Colt! Thanks for the KIP -- this will be a great addition to
> > > > Streams, I
> > > > > > can't believe we've gone so long without this.
> > > > > >
> > > > > > Overall the proposal makes sense, but I had a handful of fairly
> minor
> > > > > > questions and suggestions/requests
> > > > > >
> > > > > > 1. Seems like the last sentence in the 2nd paragraph of the
> > > Motivation
> > > > > > section is cut off and incomplete -- "want to be able to know "
> what
> > > > > > exactly?
> > > > > >
> > > > > > 2. This isn't that important since the motivation as a whole is
> clear
> > > > to me
> > > > > > and convincing enough, but I'm not quite sure I understand the
> > > example
> > > > at
> > > > > > the end of the Motivation section. How are standby tasks (and the
> > > > ability
> > > > > > to hook into and monitor their status) related to the
> > > > session.timeout.ms
> > > > > > config?
> > > > > >
> > > > > > 3. To help both old and new users of Kafka Streams understand
> this
> > > new
> > > > > > restore listener and its purpose/semantics, can we try to name
> the
> > > > class
> > > > > > and
> > > > > >  callbacks in a way that's more consistent with the active task
> > > restore
> > > > > > listener?
> > > > > >
> > > > > > 3a. StandbyTaskUpdateListener:
> > > > > > The existing restore listener is called StateRestoreListener, so
> the
> > > > new
> > > > > > one could be called something like StandbyStateRestoreListener.
> > > > Although
> > > > > > we typically refer to standby tasks as "processing" rather than
> > > > "restoring"
> > > > > > records -- ie restoration is a term for active task state
> > > > specifically. I
> > > > > > actually
> > > > > > like the original suggestion if we just drop the "Task" part of
> the
> > > > name,
> > > > > > ie StandbyUpdateListener. I think either that or
> > > StandbyRestoreListener
> > > > > > would be fine and probably the two best options.
> > > > > > Also, this probably goes without saying but any change to the
> name of
> > > > this
> > > > > > class should of course be reflected in the KafkaStreams#setXXX
> API as
> > > > well
> > > > > >
> > > > > > 3b. #onTaskCreated
> > > > > >  I know the "start" callback feels a bit different for the
> standby
> > > task
> > > > > > updater vs an active task beginning restoration, but I think we
> > > should
> > > > try
> > > > > > to
> > > > > > keep the various callbacks aligned to their active restore
> listener
> > > > > > counterpart. We can/should just replace the term "restore" with
> > > > "update"
> > > > > > for the
> > > > > > callback method names the same way we do for the class name,
> which in
> > > > this
> > > > > > case would give us #onUpdateStart. Personally I like this better,
> > > > > > but it's ultimately up to you. However, I would push back against
> > > > anything
> > > > > > that includes the word "Task" (eg #onTaskCreated) as the listener
> > > > > >  is actually not scoped to the task itself but instead to the
> > > > individual
> > > > > > state store(s). This is the main reason I would prefer calling it
> > > > something
> > > > > > like #onUpdateStart, which keeps the focus on the store being
> updated
> > > > > > rather than the task that just happens to own this store
> > > > > > One last thing on this callback -- do we really need both the
> > > > > > `earliestOffset` and `startingOffset`? I feel like this might be
> more
> > > > > > confusing than it
> > > > > > is helpful (tbh even I'm not completely sure I know what the
> > > > earliestOffset
> > > > > > is supposed to represent) More importantly, is this all
> information
> > > > > > that is already available and able to be passed in to the
> callback by
> > > > > > Streams? I haven't checked on this but it feels like the
> > > > earliestOffset is
> > > > > > likely to require a remote call, either by the embedded consumer
> or
> > > > via the
> > > > > > admin client. If so, the ROI on including this parameter seems
> > > > > > quite low (if not outright negative)
> > > > > >
> > > > > > 3c. #onBatchRestored
> > > > > > If we opt to use the term "update" in place of "restore"
> elsewhere,
> > > > then we
> > > > > > should consider doing so here as well. What do you think about
> > > > > > #onBatchUpdated, or even #onBatchProcessed?
> > > > > > I'm actually not super concerned about this particular API, and
> > > > honestly I
> > > > > > think we can use restore or update interchangeably here, so if
> you
> > > > > >  don't like any of the suggested names (and no one can think of
> > > > anything
> > > > > > better), I would just stick with #onBatchRestored. In this case,
> > > > > > it kind of makes the most sense.
> > > > > >
> > > > > > 3d. #onTaskSuspended
> > > > > > Along the same lines as 3b above, #onUpdateSuspended or just
> > > > > > #onRestoreSuspended probably makes more sense for this callback.
> > > Also,
> > > > > >  I notice the StateRestoreListener passes in the total number of
> > > > records
> > > > > > restored to its #onRestoreSuspended. Assuming we already track
> > > > > > that information in Streams and have it readily available to
> pass in
> > > at
> > > > > > whatever point we would be invoking this callback, that might be
> a
> > > > > > useful  parameter for the standby listener to have as well
> > > > > >
> > > > > > 4. I totally love the SuspendReason thing, just two
> notes/requests:
> > > > > >
> > > > > > 4a. Feel free to push back against adding onto the scope of this
> KIP,
> > > > but
> > > > > > it would be great to expand the active state restore listener
> with
> > > this
> > > > > > SuspendReason enum as well. It would be really useful for both
> > > > variants of
> > > > > > restore listener
> > > > > >
> > > > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
> > > standby
> > > > > > tasks it means basically the same thing, the point is that active
> > > > > > tasks can also be recycled into standbys through the same
> mechanism.
> > > > This
> > > > > > way they can share the SuspendReason enum -- not that it's
> > > > > > necessary for them to share, I just think it would be a good
> idea to
> > > > keep
> > > > > > the two restore listeners aligned to the highest degree possible
> for
> > > as
> > > > > > we can.
> > > > > > I was actually considering proposing a short KIP with a new
> > > > > > RecyclingListener (or something) specifically for this exact
> kind of
> > > > thing,
> > > > > > since we
> > > > > > currently have literally zero insight into the recycling process.
> > > It's
> > > > > > practically impossible to tell when a store has been converted
> from
> > > > active
> > > > > > to
> > > > > > standby, or vice versa. So having access to the SuspendReason,
> and
> > > more
> > > > > > importantly having a callback guaranteed to notify you when a
> > > > > > state store is recycled whether active or standby, would be
> amazing.
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> > > > > >
> > > > > >
> > > > > > ---------- Forwarded message ---------
> > > > > > > From: Colt McNealy <c...@littlehorse.io>
> > > > > > > Date: Tue, Oct 3, 2023 at 12:48 PM
> > > > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
> > > > > > > To: <dev@kafka.apache.org>
> > > > > > >
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > We would like to propose a small KIP to improve the ability of
> > > > Streams
> > > > > > apps
> > > > > > > to monitor the progress of their standby tasks through a
> callback
> > > > > > > interface.
> > > > > > >
> > > > > > > We have a nearly-working implementation on our fork and are
> curious
> > > > for
> > > > > > > feedback.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> > > > > > >
> > > > > > > Thank you,
> > > > > > > Colt McNealy
> > > > > > >
> > > > > > > *Founder, LittleHorse.dev*
> > > > > > >
> > > > > >
> > > >
> > >
>

Reply via email to