Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low impact
on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org> wrote:

> Thanks for the KIP, Colt and Eduwer,
>
> Are you sure there is also not a significant performance impact for
> passing into the callback `currentEndOffset`?
>
> I am asking because the comment here:
>
> https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129
>
> says that the end-offset is only updated once for standby tasks whose
> changelog topic is not piggy-backed on input topics. I could also not
> find the update of end-offset for those standbys.
>
>
> Best,
> Bruno
>
> On 10/16/23 10:55 AM, Lucas Brutschy wrote:
> > Hi all,
> >
> > it's a nice improvement! I don't have anything to add on top of the
> > previous comments, just came here to say that it seems to me consensus
> > has been reached and the result looks good to me.
> >
> > Thanks Colt and Eduwer!
> > Lucas
> >
> > On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <c...@littlehorse.io>
> wrote:
> >>
> >> Thanks, Guozhang. I've updated the KIP and will start a vote.
> >>
> >> Colt McNealy
> >>
> >> *Founder, LittleHorse.dev*
> >>
> >>
> >> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
> guozhang.wang...@gmail.com>
> >> wrote:
> >>
> >>> Thanks for the summary, that looks good to me.
> >>>
> >>> Guozhang
> >>>
> >>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io>
> wrote:
> >>>>
> >>>> Hello there!
> >>>>
> >>>> Thanks everyone for the comments. There's a lot of back-and-forth
> going
> >>> on,
> >>>> so I'll do my best to summarize what everyone's said in TLDR format:
> >>>>
> >>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> >>> similarly
> >>>> for the other methods.
> >>>> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> >>>> 3. Remove the `earliestOffset` parameter for performance reasons.
> >>>>
> >>>> If that's all fine with everyone, I'll update the KIP and we—well,
> mostly
> >>>> Edu (:  —will open a PR.
> >>>>
> >>>> Cheers,
> >>>> Colt McNealy
> >>>>
> >>>> *Founder, LittleHorse.dev*
> >>>>
> >>>>
> >>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
> edu...@littlehorse.io>
> >>>> wrote:
> >>>>
> >>>>> Hello everyone,
> >>>>>
> >>>>> Thanks for all your feedback for this KIP!
> >>>>>
> >>>>> I think that the key to choosing proper names for this API is
> >>> understanding
> >>>>> the terms used inside the StoreChangelogReader. Currently, this class
> >>> has
> >>>>> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
> >>> opinion,
> >>>>> using StandbyUpdateListener for the interface fits better on these
> >>> terms.
> >>>>> Same applies for onUpdateStart/Suspended.
> >>>>>
> >>>>> StoreChangelogReader uses "the same mechanism" for active task
> >>> restoration
> >>>>> and standby task updates, but this is an implementation detail. Under
> >>>>> normal circumstances (no rebalances or task migrations), the
> changelog
> >>>>> reader will be in STANDBY_UPDATING, which means it will be updating
> >>> standby
> >>>>> tasks as long as there are new records in the changelog topic. That's
> >>> why I
> >>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
> >>> 100%
> >>>>> align with StateRestoreListener, but either one is fine.
> >>>>>
> >>>>> Edu
> >>>>>
> >>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> >>> guozhang.wang...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello Colt,
> >>>>>>
> >>>>>> Thanks for writing the KIP! I have read through the updated KIP and
> >>>>>> overall it looks great. I only have minor naming comments (well,
> >>>>>> aren't naming the least boring stuff to discuss and that takes the
> >>>>>> most of the time for KIPs :P):
> >>>>>>
> >>>>>> 1. I tend to agree with Sophie regarding whether or not to include
> >>>>>> "Standby" in the functions of "onStandbyUpdateStart/Suspended",
> since
> >>>>>> it is also more consistent with the functions of
> >>>>>> "StateRestoreListener" where we do not name it as
> >>>>>> "onStateRestoreState" etc.
> >>>>>>
> >>>>>> 2. I know in community discussions we sometimes say "a standby is
> >>>>>> promoted to active", but in the official code / java docs we did not
> >>>>>> have a term of "promotion", since what the code does is really
> >>> recycle
> >>>>>> the task (while keeping its state stores open), and create a new
> >>>>>> active task that takes in the recycled state stores and just
> changing
> >>>>>> the other fields like task type etc. After thinking about this for a
> >>>>>> bit, I tend to feel that "promoted" is indeed a better name for user
> >>>>>> facing purposes while "recycle" is more of a technical detail inside
> >>>>>> the code and could be abstracted away from users. So I feel keeping
> >>>>>> the name "PROMOTED" is fine.
> >>>>>>
> >>>>>> 3. Regarding "earliestOffset", it does feel like we cannot always
> >>>>>> avoid another call to the Kafka API. And on the other hand, I also
> >>>>>> tend to think that such bookkeeping may be better done at the app
> >>>>>> level than from the Streams' public API level. I.e. the app could
> >>> keep
> >>>>>> a "first ever starting offset" per "topic-partition-store" key, and
> a
> >>>>>> when we have rolling restart and hence some standby task keeps
> >>>>>> "jumping" from one client to another via task assignment, the app
> >>>>>> would update this value just one when it finds the
> >>>>>> ""topic-partition-store" was never triggered before. What do you
> >>>>>> think?
> >>>>>>
> >>>>>> 4. I do not have a strong opinion either, but what about
> >>>>> "onBatchUpdated" ?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Sohpie—
> >>>>>>>
> >>>>>>> Thank you very much for such a detailed review of the KIP. It might
> >>>>>>> actually be longer than the original KIP in the first place!
> >>>>>>>
> >>>>>>> 1. Ack'ed and fixed.
> >>>>>>>
> >>>>>>> 2. Correct, this is a confusing passage and requires context:
> >>>>>>>
> >>>>>>> One thing on our list of TODO's regarding reliability is to
> >>> determine
> >>>>> how
> >>>>>>> to configure `session.timeout.ms`. In our Kubernetes Environment,
> >>> an
> >>>>>>> instance of our Streams App can be terminated, restarted, and get
> >>> back
> >>>>>> into
> >>>>>>> the "RUNNING" Streams state in about 20 seconds. We have two
> >>> options
> >>>>>> here:
> >>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal with 20
> >>>>> seconds
> >>>>>> of
> >>>>>>> unavailability for affected partitions, but avoid shuffling Tasks;
> >>> or
> >>>>> b)
> >>>>>>> set session.timeout.ms to a low value, such as 6 seconds (
> >>>>>>> heartbeat.interval.ms of 2000), and reduce the unavailability
> >>> window
> >>>>>> during
> >>>>>>> a rolling bounce but incur an "extra" rebalance. There are several
> >>>>>>> different costs to a rebalance, including the shuffling of standby
> >>>>> tasks.
> >>>>>>> JMX metrics are not fine-grained enough to give us an accurate
> >>> picture
> >>>>> of
> >>>>>>> what's going on with the whole Standby Task Shuffle Dance. I
> >>>>> hypothesize
> >>>>>>> that the Standby Update Listener might help us clarify just how the
> >>>>>>> shuffling actually (not theoretically) works, which will help us
> >>> make a
> >>>>>>> more informed decision about the session timeout config.
> >>>>>>>
> >>>>>>> If you think this is worth putting in the KIP, I'll polish it and
> >>> do
> >>>>> so;
> >>>>>>> else, I'll remove the current half-baked explanation.
> >>>>>>>
> >>>>>>> 3. Overall, I agree with this. In our app, each Task has only one
> >>> Store
> >>>>>> to
> >>>>>>> reduce the number of changelog partitions, so I sometimes forget
> >>> the
> >>>>>>> distinction between the two concepts, as reflected in the KIP (:
> >>>>>>>
> >>>>>>> 3a. I don't like the word "Restore" here, since Restoration refers
> >>> to
> >>>>> an
> >>>>>>> Active Task getting caught up in preparation to resume processing.
> >>>>>>> `StandbyUpdateListener` is fine by me; I have updated the KIP. I
> >>> am a
> >>>>>>> native Python speaker so I do prefer shorter names anyways (:
> >>>>>>>
> >>>>>>> 3b1. +1 to removing the word 'Task'.
> >>>>>>>
> >>>>>>> 3b2. I like `onUpdateStart()`, but with your permission I'd prefer
> >>>>>>> `onStandbyUpdateStart()` which matches the name of the Interface
> >>>>>>> "StandbyUpdateListener". (the python part of me hates this,
> >>> however)
> >>>>>>>
> >>>>>>> 3b3. Going back to question 2), `earliestOffset` was intended to
> >>> allow
> >>>>> us
> >>>>>>> to more easily calculate the amount of state _already loaded_ in
> >>> the
> >>>>>> store
> >>>>>>> by subtracting (startingOffset - earliestOffset). This would help
> >>> us
> >>>>> see
> >>>>>>> how much inefficiency is introduced in a rolling restart—if we end
> >>> up
> >>>>>> going
> >>>>>>> from a situation with an up-to-date standby before the restart, and
> >>>>> then
> >>>>>>> after the whole restart, the Task is shuffled onto an instance
> >>> where
> >>>>>> there
> >>>>>>> is no previous state, then that is expensive. However, if the final
> >>>>>>> shuffling results in the Task back on an instance with a lot of
> >>>>> pre-built
> >>>>>>> state, it's not expensive.
> >>>>>>>
> >>>>>>> If a call over the network is required to determine the
> >>> earliestOffset,
> >>>>>>> then this is a "hard no-go" for me, and we will remove it (I'll
> >>> have to
> >>>>>>> check with Eduwer as he is close to having a working
> >>> implementation). I
> >>>>>>> think we can probably determine what we wanted to see in a
> >>> different
> >>>>>>> way, but it will take more thinking.. If `earliestOffset` is
> >>> confusing,
> >>>>>>> perhaps rename it to `earliestChangelogOffset`?
> >>>>>>>
> >>>>>>> `startingOffset` is easy to remove as it can be determined from the
> >>>>> first
> >>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> >>>>>>>
> >>>>>>> Anyways, I've updated the JavaDoc in the interface; hopefully it's
> >>> more
> >>>>>>> clear. Awaiting further instructions here.
> >>>>>>>
> >>>>>>> 3c. Good point; after thinking, my preference is
> >>> `onBatchLoaded()`  ->
> >>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`.
> >>> I am
> >>>>>>> less fond of "processed" because when I was first learning Streams
> >>> I
> >>>>>>> mistakenly thought that standby tasks actually processed the input
> >>>>> topic
> >>>>>>> rather than loaded from the changelog. I'll defer to you here.
> >>>>>>>
> >>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet
> >>>>>>> `onStandbyUpdateSuspended()`. Will check about the implementation
> >>> of
> >>>>>>> keeping track of the number of records loaded.
> >>>>>>>
> >>>>>>> 4a. I think this might be best in a separate KIP, especially given
> >>> that
> >>>>>>> this is my and Eduwer's first time contributing to Kafka (so we
> >>> want to
> >>>>>>> minimize the blast radius).
> >>>>>>>
> >>>>>>> 4b. I might respectfully (and timidly) push back here, RECYCLED
> >>> for an
> >>>>>>> Active Task is a bit confusing to me. DEMOTED and MIGRATED make
> >>> sense
> >>>>>> from
> >>>>>>> the standpoint of an Active Task, recycling to me sounds like
> >>> throwing
> >>>>>>> stuff away, such that the resources (i.e. disk space) can be used
> >>> by a
> >>>>>>> separate Task. As an alternative rather than trying to reuse the
> >>> same
> >>>>>> enum,
> >>>>>>> maybe rename it to `StandbySuspendReason` to avoid naming conflicts
> >>>>> with
> >>>>>>> `ActiveSuspendReason`? However, I could be convinced to rename
> >>> PROMOTED
> >>>>>> ->
> >>>>>>> RECYCLED, especially if Eduwer agrees.
> >>>>>>>
> >>>>>>> TLDR:
> >>>>>>>
> >>>>>>> T1. Agreed, will remove the word "Task" as it's incorrect.
> >>>>>>> T2. Will update to `onStandbyUpdateStart()`
> >>>>>>> T3. Awaiting further instructions on earliestOffset and
> >>> startingOffset.
> >>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps
> >>>>>> `onBatchLoaded()`?
> >>>>>>> T5. Will update to `onStandbyUpdateSuspended()`
> >>>>>>> T6. Thoughts on renaming SuspendReason to StandbySuspendReason,
> >>> rather
> >>>>>> than
> >>>>>>> renaming PROMOTED to RECYCLED? @Eduwer?
> >>>>>>>
> >>>>>>> Long Live the Otter,
> >>>>>>> Colt McNealy
> >>>>>>>
> >>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> >>>>>> sop...@responsive.dev>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great addition to
> >>>>>> Streams, I
> >>>>>>>> can't believe we've gone so long without this.
> >>>>>>>>
> >>>>>>>> Overall the proposal makes sense, but I had a handful of fairly
> >>> minor
> >>>>>>>> questions and suggestions/requests
> >>>>>>>>
> >>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of the
> >>>>> Motivation
> >>>>>>>> section is cut off and incomplete -- "want to be able to know "
> >>> what
> >>>>>>>> exactly?
> >>>>>>>>
> >>>>>>>> 2. This isn't that important since the motivation as a whole is
> >>> clear
> >>>>>> to me
> >>>>>>>> and convincing enough, but I'm not quite sure I understand the
> >>>>> example
> >>>>>> at
> >>>>>>>> the end of the Motivation section. How are standby tasks (and the
> >>>>>> ability
> >>>>>>>> to hook into and monitor their status) related to the
> >>>>>> session.timeout.ms
> >>>>>>>> config?
> >>>>>>>>
> >>>>>>>> 3. To help both old and new users of Kafka Streams understand
> >>> this
> >>>>> new
> >>>>>>>> restore listener and its purpose/semantics, can we try to name
> >>> the
> >>>>>> class
> >>>>>>>> and
> >>>>>>>>   callbacks in a way that's more consistent with the active task
> >>>>> restore
> >>>>>>>> listener?
> >>>>>>>>
> >>>>>>>> 3a. StandbyTaskUpdateListener:
> >>>>>>>> The existing restore listener is called StateRestoreListener, so
> >>> the
> >>>>>> new
> >>>>>>>> one could be called something like StandbyStateRestoreListener.
> >>>>>> Although
> >>>>>>>> we typically refer to standby tasks as "processing" rather than
> >>>>>> "restoring"
> >>>>>>>> records -- ie restoration is a term for active task state
> >>>>>> specifically. I
> >>>>>>>> actually
> >>>>>>>> like the original suggestion if we just drop the "Task" part of
> >>> the
> >>>>>> name,
> >>>>>>>> ie StandbyUpdateListener. I think either that or
> >>>>> StandbyRestoreListener
> >>>>>>>> would be fine and probably the two best options.
> >>>>>>>> Also, this probably goes without saying but any change to the
> >>> name of
> >>>>>> this
> >>>>>>>> class should of course be reflected in the KafkaStreams#setXXX
> >>> API as
> >>>>>> well
> >>>>>>>>
> >>>>>>>> 3b. #onTaskCreated
> >>>>>>>>   I know the "start" callback feels a bit different for the
> >>> standby
> >>>>> task
> >>>>>>>> updater vs an active task beginning restoration, but I think we
> >>>>> should
> >>>>>> try
> >>>>>>>> to
> >>>>>>>> keep the various callbacks aligned to their active restore
> >>> listener
> >>>>>>>> counterpart. We can/should just replace the term "restore" with
> >>>>>> "update"
> >>>>>>>> for the
> >>>>>>>> callback method names the same way we do for the class name,
> >>> which in
> >>>>>> this
> >>>>>>>> case would give us #onUpdateStart. Personally I like this better,
> >>>>>>>> but it's ultimately up to you. However, I would push back against
> >>>>>> anything
> >>>>>>>> that includes the word "Task" (eg #onTaskCreated) as the listener
> >>>>>>>>   is actually not scoped to the task itself but instead to the
> >>>>>> individual
> >>>>>>>> state store(s). This is the main reason I would prefer calling it
> >>>>>> something
> >>>>>>>> like #onUpdateStart, which keeps the focus on the store being
> >>> updated
> >>>>>>>> rather than the task that just happens to own this store
> >>>>>>>> One last thing on this callback -- do we really need both the
> >>>>>>>> `earliestOffset` and `startingOffset`? I feel like this might be
> >>> more
> >>>>>>>> confusing than it
> >>>>>>>> is helpful (tbh even I'm not completely sure I know what the
> >>>>>> earliestOffset
> >>>>>>>> is supposed to represent) More importantly, is this all
> >>> information
> >>>>>>>> that is already available and able to be passed in to the
> >>> callback by
> >>>>>>>> Streams? I haven't checked on this but it feels like the
> >>>>>> earliestOffset is
> >>>>>>>> likely to require a remote call, either by the embedded consumer
> >>> or
> >>>>>> via the
> >>>>>>>> admin client. If so, the ROI on including this parameter seems
> >>>>>>>> quite low (if not outright negative)
> >>>>>>>>
> >>>>>>>> 3c. #onBatchRestored
> >>>>>>>> If we opt to use the term "update" in place of "restore"
> >>> elsewhere,
> >>>>>> then we
> >>>>>>>> should consider doing so here as well. What do you think about
> >>>>>>>> #onBatchUpdated, or even #onBatchProcessed?
> >>>>>>>> I'm actually not super concerned about this particular API, and
> >>>>>> honestly I
> >>>>>>>> think we can use restore or update interchangeably here, so if
> >>> you
> >>>>>>>>   don't like any of the suggested names (and no one can think of
> >>>>>> anything
> >>>>>>>> better), I would just stick with #onBatchRestored. In this case,
> >>>>>>>> it kind of makes the most sense.
> >>>>>>>>
> >>>>>>>> 3d. #onTaskSuspended
> >>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or just
> >>>>>>>> #onRestoreSuspended probably makes more sense for this callback.
> >>>>> Also,
> >>>>>>>>   I notice the StateRestoreListener passes in the total number of
> >>>>>> records
> >>>>>>>> restored to its #onRestoreSuspended. Assuming we already track
> >>>>>>>> that information in Streams and have it readily available to
> >>> pass in
> >>>>> at
> >>>>>>>> whatever point we would be invoking this callback, that might be
> >>> a
> >>>>>>>> useful  parameter for the standby listener to have as well
> >>>>>>>>
> >>>>>>>> 4. I totally love the SuspendReason thing, just two
> >>> notes/requests:
> >>>>>>>>
> >>>>>>>> 4a. Feel free to push back against adding onto the scope of this
> >>> KIP,
> >>>>>> but
> >>>>>>>> it would be great to expand the active state restore listener
> >>> with
> >>>>> this
> >>>>>>>> SuspendReason enum as well. It would be really useful for both
> >>>>>> variants of
> >>>>>>>> restore listener
> >>>>>>>>
> >>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
> >>>>> standby
> >>>>>>>> tasks it means basically the same thing, the point is that active
> >>>>>>>> tasks can also be recycled into standbys through the same
> >>> mechanism.
> >>>>>> This
> >>>>>>>> way they can share the SuspendReason enum -- not that it's
> >>>>>>>> necessary for them to share, I just think it would be a good
> >>> idea to
> >>>>>> keep
> >>>>>>>> the two restore listeners aligned to the highest degree possible
> >>> for
> >>>>> as
> >>>>>>>> we can.
> >>>>>>>> I was actually considering proposing a short KIP with a new
> >>>>>>>> RecyclingListener (or something) specifically for this exact
> >>> kind of
> >>>>>> thing,
> >>>>>>>> since we
> >>>>>>>> currently have literally zero insight into the recycling process.
> >>>>> It's
> >>>>>>>> practically impossible to tell when a store has been converted
> >>> from
> >>>>>> active
> >>>>>>>> to
> >>>>>>>> standby, or vice versa. So having access to the SuspendReason,
> >>> and
> >>>>> more
> >>>>>>>> importantly having a callback guaranteed to notify you when a
> >>>>>>>> state store is recycled whether active or standby, would be
> >>> amazing.
> >>>>>>>>
> >>>>>>>> Thanks for the KIP!
> >>>>>>>>
> >>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> ---------- Forwarded message ---------
> >>>>>>>>> From: Colt McNealy <c...@littlehorse.io>
> >>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM
> >>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
> >>>>>>>>> To: <dev@kafka.apache.org>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> We would like to propose a small KIP to improve the ability of
> >>>>>> Streams
> >>>>>>>> apps
> >>>>>>>>> to monitor the progress of their standby tasks through a
> >>> callback
> >>>>>>>>> interface.
> >>>>>>>>>
> >>>>>>>>> We have a nearly-working implementation on our fork and are
> >>> curious
> >>>>>> for
> >>>>>>>>> feedback.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> >>>>>>>>>
> >>>>>>>>> Thank you,
> >>>>>>>>> Colt McNealy
> >>>>>>>>>
> >>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>
>

Reply via email to