Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <c...@littlehorse.io> wrote:
>
> Thanks, Guozhang. I've updated the KIP and will start a vote.
>
> Colt McNealy
>
> *Founder, LittleHorse.dev*
>
>
> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <guozhang.wang...@gmail.com>
> wrote:
>
> > Thanks for the summary, that looks good to me.
> >
> > Guozhang
> >
> > On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io> wrote:
> > >
> > > Hello there!
> > >
> > > Thanks everyone for the comments. There's a lot of back-and-forth going
> > on,
> > > so I'll do my best to summarize what everyone's said in TLDR format:
> > >
> > > 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> > similarly
> > > for the other methods.
> > > 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> > > 3. Remove the `earliestOffset` parameter for performance reasons.
> > >
> > > If that's all fine with everyone, I'll update the KIP and we—well, mostly
> > > Edu (:  —will open a PR.
> > >
> > > Cheers,
> > > Colt McNealy
> > >
> > > *Founder, LittleHorse.dev*
> > >
> > >
> > > On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <edu...@littlehorse.io>
> > > wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > Thanks for all your feedback for this KIP!
> > > >
> > > > I think that the key to choosing proper names for this API is
> > understanding
> > > > the terms used inside the StoreChangelogReader. Currently, this class
> > has
> > > > two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> > opinion,
> > > > using StandbyUpdateListener for the interface fits better on these
> > terms.
> > > > Same applies for onUpdateStart/Suspended.
> > > >
> > > > StoreChangelogReader uses "the same mechanism" for active task
> > restoration
> > > > and standby task updates, but this is an implementation detail. Under
> > > > normal circumstances (no rebalances or task migrations), the changelog
> > > > reader will be in STANDBY_UPDATING, which means it will be updating
> > standby
> > > > tasks as long as there are new records in the changelog topic. That's
> > why I
> > > > prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> > 100%
> > > > align with StateRestoreListener, but either one is fine.
> > > >
> > > > Edu
> > > >
> > > > On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> > guozhang.wang...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hello Colt,
> > > > >
> > > > > Thanks for writing the KIP! I have read through the updated KIP and
> > > > > overall it looks great. I only have minor naming comments (well,
> > > > > aren't naming the least boring stuff to discuss and that takes the
> > > > > most of the time for KIPs :P):
> > > > >
> > > > > 1. I tend to agree with Sophie regarding whether or not to include
> > > > > "Standby" in the functions of "onStandbyUpdateStart/Suspended", since
> > > > > it is also more consistent with the functions of
> > > > > "StateRestoreListener" where we do not name it as
> > > > > "onStateRestoreState" etc.
> > > > >
> > > > > 2. I know in community discussions we sometimes say "a standby is
> > > > > promoted to active", but in the official code / java docs we did not
> > > > > have a term of "promotion", since what the code does is really
> > recycle
> > > > > the task (while keeping its state stores open), and create a new
> > > > > active task that takes in the recycled state stores and just changing
> > > > > the other fields like task type etc. After thinking about this for a
> > > > > bit, I tend to feel that "promoted" is indeed a better name for user
> > > > > facing purposes while "recycle" is more of a technical detail inside
> > > > > the code and could be abstracted away from users. So I feel keeping
> > > > > the name "PROMOTED" is fine.
> > > > >
> > > > > 3. Regarding "earliestOffset", it does feel like we cannot always
> > > > > avoid another call to the Kafka API. And on the other hand, I also
> > > > > tend to think that such bookkeeping may be better done at the app
> > > > > level than from the Streams' public API level. I.e. the app could
> > keep
> > > > > a "first ever starting offset" per "topic-partition-store" key, and a
> > > > > when we have rolling restart and hence some standby task keeps
> > > > > "jumping" from one client to another via task assignment, the app
> > > > > would update this value just one when it finds the
> > > > > ""topic-partition-store" was never triggered before. What do you
> > > > > think?
> > > > >
> > > > > 4. I do not have a strong opinion either, but what about
> > > > "onBatchUpdated" ?
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
> > > > wrote:
> > > > > >
> > > > > > Sohpie—
> > > > > >
> > > > > > Thank you very much for such a detailed review of the KIP. It might
> > > > > > actually be longer than the original KIP in the first place!
> > > > > >
> > > > > > 1. Ack'ed and fixed.
> > > > > >
> > > > > > 2. Correct, this is a confusing passage and requires context:
> > > > > >
> > > > > > One thing on our list of TODO's regarding reliability is to
> > determine
> > > > how
> > > > > > to configure `session.timeout.ms`. In our Kubernetes Environment,
> > an
> > > > > > instance of our Streams App can be terminated, restarted, and get
> > back
> > > > > into
> > > > > > the "RUNNING" Streams state in about 20 seconds. We have two
> > options
> > > > > here:
> > > > > > a) set session.timeout.ms to 30 seconds or so, and deal with 20
> > > > seconds
> > > > > of
> > > > > > unavailability for affected partitions, but avoid shuffling Tasks;
> > or
> > > > b)
> > > > > > set session.timeout.ms to a low value, such as 6 seconds (
> > > > > > heartbeat.interval.ms of 2000), and reduce the unavailability
> > window
> > > > > during
> > > > > > a rolling bounce but incur an "extra" rebalance. There are several
> > > > > > different costs to a rebalance, including the shuffling of standby
> > > > tasks.
> > > > > > JMX metrics are not fine-grained enough to give us an accurate
> > picture
> > > > of
> > > > > > what's going on with the whole Standby Task Shuffle Dance. I
> > > > hypothesize
> > > > > > that the Standby Update Listener might help us clarify just how the
> > > > > > shuffling actually (not theoretically) works, which will help us
> > make a
> > > > > > more informed decision about the session timeout config.
> > > > > >
> > > > > > If you think this is worth putting in the KIP, I'll polish it and
> > do
> > > > so;
> > > > > > else, I'll remove the current half-baked explanation.
> > > > > >
> > > > > > 3. Overall, I agree with this. In our app, each Task has only one
> > Store
> > > > > to
> > > > > > reduce the number of changelog partitions, so I sometimes forget
> > the
> > > > > > distinction between the two concepts, as reflected in the KIP (:
> > > > > >
> > > > > > 3a. I don't like the word "Restore" here, since Restoration refers
> > to
> > > > an
> > > > > > Active Task getting caught up in preparation to resume processing.
> > > > > > `StandbyUpdateListener` is fine by me; I have updated the KIP. I
> > am a
> > > > > > native Python speaker so I do prefer shorter names anyways (:
> > > > > >
> > > > > > 3b1. +1 to removing the word 'Task'.
> > > > > >
> > > > > > 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> > > > > > `onStandbyUpdateStart()` which matches the name of the Interface
> > > > > > "StandbyUpdateListener". (the python part of me hates this,
> > however)
> > > > > >
> > > > > > 3b3. Going back to question 2), `earliestOffset` was intended to
> > allow
> > > > us
> > > > > > to more easily calculate the amount of state _already loaded_ in
> > the
> > > > > store
> > > > > > by subtracting (startingOffset - earliestOffset). This would help
> > us
> > > > see
> > > > > > how much inefficiency is introduced in a rolling restart—if we end
> > up
> > > > > going
> > > > > > from a situation with an up-to-date standby before the restart, and
> > > > then
> > > > > > after the whole restart, the Task is shuffled onto an instance
> > where
> > > > > there
> > > > > > is no previous state, then that is expensive. However, if the final
> > > > > > shuffling results in the Task back on an instance with a lot of
> > > > pre-built
> > > > > > state, it's not expensive.
> > > > > >
> > > > > > If a call over the network is required to determine the
> > earliestOffset,
> > > > > > then this is a "hard no-go" for me, and we will remove it (I'll
> > have to
> > > > > > check with Eduwer as he is close to having a working
> > implementation). I
> > > > > > think we can probably determine what we wanted to see in a
> > different
> > > > > > way, but it will take more thinking.. If `earliestOffset` is
> > confusing,
> > > > > > perhaps rename it to `earliestChangelogOffset`?
> > > > > >
> > > > > > `startingOffset` is easy to remove as it can be determined from the
> > > > first
> > > > > > call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> > > > > >
> > > > > > Anyways, I've updated the JavaDoc in the interface; hopefully it's
> > more
> > > > > > clear. Awaiting further instructions here.
> > > > > >
> > > > > > 3c. Good point; after thinking, my preference is
> > `onBatchLoaded()`  ->
> > > > > > `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`.
> > I am
> > > > > > less fond of "processed" because when I was first learning Streams
> > I
> > > > > > mistakenly thought that standby tasks actually processed the input
> > > > topic
> > > > > > rather than loaded from the changelog. I'll defer to you here.
> > > > > >
> > > > > > 3d. +1 to `onUpdateSuspended()`, or better yet
> > > > > > `onStandbyUpdateSuspended()`. Will check about the implementation
> > of
> > > > > > keeping track of the number of records loaded.
> > > > > >
> > > > > > 4a. I think this might be best in a separate KIP, especially given
> > that
> > > > > > this is my and Eduwer's first time contributing to Kafka (so we
> > want to
> > > > > > minimize the blast radius).
> > > > > >
> > > > > > 4b. I might respectfully (and timidly) push back here, RECYCLED
> > for an
> > > > > > Active Task is a bit confusing to me. DEMOTED and MIGRATED make
> > sense
> > > > > from
> > > > > > the standpoint of an Active Task, recycling to me sounds like
> > throwing
> > > > > > stuff away, such that the resources (i.e. disk space) can be used
> > by a
> > > > > > separate Task. As an alternative rather than trying to reuse the
> > same
> > > > > enum,
> > > > > > maybe rename it to `StandbySuspendReason` to avoid naming conflicts
> > > > with
> > > > > > `ActiveSuspendReason`? However, I could be convinced to rename
> > PROMOTED
> > > > > ->
> > > > > > RECYCLED, especially if Eduwer agrees.
> > > > > >
> > > > > > TLDR:
> > > > > >
> > > > > > T1. Agreed, will remove the word "Task" as it's incorrect.
> > > > > > T2. Will update to `onStandbyUpdateStart()`
> > > > > > T3. Awaiting further instructions on earliestOffset and
> > startingOffset.
> > > > > > T4. I don't like `onBatchProcessed()` too much, perhaps
> > > > > `onBatchLoaded()`?
> > > > > > T5. Will update to `onStandbyUpdateSuspended()`
> > > > > > T6. Thoughts on renaming SuspendReason to StandbySuspendReason,
> > rather
> > > > > than
> > > > > > renaming PROMOTED to RECYCLED? @Eduwer?
> > > > > >
> > > > > > Long Live the Otter,
> > > > > > Colt McNealy
> > > > > >
> > > > > > *Founder, LittleHorse.dev*
> > > > > >
> > > > > >
> > > > > > On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> > > > > sop...@responsive.dev>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Colt! Thanks for the KIP -- this will be a great addition to
> > > > > Streams, I
> > > > > > > can't believe we've gone so long without this.
> > > > > > >
> > > > > > > Overall the proposal makes sense, but I had a handful of fairly
> > minor
> > > > > > > questions and suggestions/requests
> > > > > > >
> > > > > > > 1. Seems like the last sentence in the 2nd paragraph of the
> > > > Motivation
> > > > > > > section is cut off and incomplete -- "want to be able to know "
> > what
> > > > > > > exactly?
> > > > > > >
> > > > > > > 2. This isn't that important since the motivation as a whole is
> > clear
> > > > > to me
> > > > > > > and convincing enough, but I'm not quite sure I understand the
> > > > example
> > > > > at
> > > > > > > the end of the Motivation section. How are standby tasks (and the
> > > > > ability
> > > > > > > to hook into and monitor their status) related to the
> > > > > session.timeout.ms
> > > > > > > config?
> > > > > > >
> > > > > > > 3. To help both old and new users of Kafka Streams understand
> > this
> > > > new
> > > > > > > restore listener and its purpose/semantics, can we try to name
> > the
> > > > > class
> > > > > > > and
> > > > > > >  callbacks in a way that's more consistent with the active task
> > > > restore
> > > > > > > listener?
> > > > > > >
> > > > > > > 3a. StandbyTaskUpdateListener:
> > > > > > > The existing restore listener is called StateRestoreListener, so
> > the
> > > > > new
> > > > > > > one could be called something like StandbyStateRestoreListener.
> > > > > Although
> > > > > > > we typically refer to standby tasks as "processing" rather than
> > > > > "restoring"
> > > > > > > records -- ie restoration is a term for active task state
> > > > > specifically. I
> > > > > > > actually
> > > > > > > like the original suggestion if we just drop the "Task" part of
> > the
> > > > > name,
> > > > > > > ie StandbyUpdateListener. I think either that or
> > > > StandbyRestoreListener
> > > > > > > would be fine and probably the two best options.
> > > > > > > Also, this probably goes without saying but any change to the
> > name of
> > > > > this
> > > > > > > class should of course be reflected in the KafkaStreams#setXXX
> > API as
> > > > > well
> > > > > > >
> > > > > > > 3b. #onTaskCreated
> > > > > > >  I know the "start" callback feels a bit different for the
> > standby
> > > > task
> > > > > > > updater vs an active task beginning restoration, but I think we
> > > > should
> > > > > try
> > > > > > > to
> > > > > > > keep the various callbacks aligned to their active restore
> > listener
> > > > > > > counterpart. We can/should just replace the term "restore" with
> > > > > "update"
> > > > > > > for the
> > > > > > > callback method names the same way we do for the class name,
> > which in
> > > > > this
> > > > > > > case would give us #onUpdateStart. Personally I like this better,
> > > > > > > but it's ultimately up to you. However, I would push back against
> > > > > anything
> > > > > > > that includes the word "Task" (eg #onTaskCreated) as the listener
> > > > > > >  is actually not scoped to the task itself but instead to the
> > > > > individual
> > > > > > > state store(s). This is the main reason I would prefer calling it
> > > > > something
> > > > > > > like #onUpdateStart, which keeps the focus on the store being
> > updated
> > > > > > > rather than the task that just happens to own this store
> > > > > > > One last thing on this callback -- do we really need both the
> > > > > > > `earliestOffset` and `startingOffset`? I feel like this might be
> > more
> > > > > > > confusing than it
> > > > > > > is helpful (tbh even I'm not completely sure I know what the
> > > > > earliestOffset
> > > > > > > is supposed to represent) More importantly, is this all
> > information
> > > > > > > that is already available and able to be passed in to the
> > callback by
> > > > > > > Streams? I haven't checked on this but it feels like the
> > > > > earliestOffset is
> > > > > > > likely to require a remote call, either by the embedded consumer
> > or
> > > > > via the
> > > > > > > admin client. If so, the ROI on including this parameter seems
> > > > > > > quite low (if not outright negative)
> > > > > > >
> > > > > > > 3c. #onBatchRestored
> > > > > > > If we opt to use the term "update" in place of "restore"
> > elsewhere,
> > > > > then we
> > > > > > > should consider doing so here as well. What do you think about
> > > > > > > #onBatchUpdated, or even #onBatchProcessed?
> > > > > > > I'm actually not super concerned about this particular API, and
> > > > > honestly I
> > > > > > > think we can use restore or update interchangeably here, so if
> > you
> > > > > > >  don't like any of the suggested names (and no one can think of
> > > > > anything
> > > > > > > better), I would just stick with #onBatchRestored. In this case,
> > > > > > > it kind of makes the most sense.
> > > > > > >
> > > > > > > 3d. #onTaskSuspended
> > > > > > > Along the same lines as 3b above, #onUpdateSuspended or just
> > > > > > > #onRestoreSuspended probably makes more sense for this callback.
> > > > Also,
> > > > > > >  I notice the StateRestoreListener passes in the total number of
> > > > > records
> > > > > > > restored to its #onRestoreSuspended. Assuming we already track
> > > > > > > that information in Streams and have it readily available to
> > pass in
> > > > at
> > > > > > > whatever point we would be invoking this callback, that might be
> > a
> > > > > > > useful  parameter for the standby listener to have as well
> > > > > > >
> > > > > > > 4. I totally love the SuspendReason thing, just two
> > notes/requests:
> > > > > > >
> > > > > > > 4a. Feel free to push back against adding onto the scope of this
> > KIP,
> > > > > but
> > > > > > > it would be great to expand the active state restore listener
> > with
> > > > this
> > > > > > > SuspendReason enum as well. It would be really useful for both
> > > > > variants of
> > > > > > > restore listener
> > > > > > >
> > > > > > > 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
> > > > standby
> > > > > > > tasks it means basically the same thing, the point is that active
> > > > > > > tasks can also be recycled into standbys through the same
> > mechanism.
> > > > > This
> > > > > > > way they can share the SuspendReason enum -- not that it's
> > > > > > > necessary for them to share, I just think it would be a good
> > idea to
> > > > > keep
> > > > > > > the two restore listeners aligned to the highest degree possible
> > for
> > > > as
> > > > > > > we can.
> > > > > > > I was actually considering proposing a short KIP with a new
> > > > > > > RecyclingListener (or something) specifically for this exact
> > kind of
> > > > > thing,
> > > > > > > since we
> > > > > > > currently have literally zero insight into the recycling process.
> > > > It's
> > > > > > > practically impossible to tell when a store has been converted
> > from
> > > > > active
> > > > > > > to
> > > > > > > standby, or vice versa. So having access to the SuspendReason,
> > and
> > > > more
> > > > > > > importantly having a callback guaranteed to notify you when a
> > > > > > > state store is recycled whether active or standby, would be
> > amazing.
> > > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > >
> > > > > > > -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> > > > > > >
> > > > > > >
> > > > > > > ---------- Forwarded message ---------
> > > > > > > > From: Colt McNealy <c...@littlehorse.io>
> > > > > > > > Date: Tue, Oct 3, 2023 at 12:48 PM
> > > > > > > > Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
> > > > > > > > To: <dev@kafka.apache.org>
> > > > > > > >
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > We would like to propose a small KIP to improve the ability of
> > > > > Streams
> > > > > > > apps
> > > > > > > > to monitor the progress of their standby tasks through a
> > callback
> > > > > > > > interface.
> > > > > > > >
> > > > > > > > We have a nearly-working implementation on our fork and are
> > curious
> > > > > for
> > > > > > > > feedback.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> > > > > > > >
> > > > > > > > Thank you,
> > > > > > > > Colt McNealy
> > > > > > > >
> > > > > > > > *Founder, LittleHorse.dev*
> > > > > > > >
> > > > > > >
> > > > >
> > > >
> >

Reply via email to