Just want to checkpoint the current state of this KIP and make sure we're
on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded and
onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough that
we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth is
that English grammar can be sticky and while it could be argued that it is
the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound weird
at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in order
to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to focus
on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on with
a followup KIP if/when we find an efficient way to add additional useful
information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics API to
get the current lag and/or end offset for the changelog -- it's possible
this does not represent the most up-to-date end offset (I'm not sure it
does or does not), but it should be close enough to be reliable and useful
for the purpose of monitoring -- I mean it is a metric, after all.

Hope this helps -- in the end, it's up to you (Colt) to decide what you
want to bring in scope or not. We still have more than 3 weeks until the
KIP freeze as currently proposed, so in theory you could even implement
this KIP without the end offset and then do a followup KIP to add the end
offset within the same release, ie without any deprecations. There are
plenty of paths forward here, so don't let us drag this out forever if you
know what you want

Cheers,
Sophie

On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax <mj...@apache.org> wrote:

> Forgot one thing:
>
> We could also pass `currentLag()` into `onBachLoaded()` instead of
> end-offset.
>
>
> -Matthias
>
> On 10/20/23 10:56 AM, Matthias J. Sax wrote:
> > Thanks for digging into this Bruno.
> >
> > The JavaDoc on the consumer does not say anything specific about
> > `endOffset` guarantees:
> >
> >> Get the end offsets for the given partitions. In the default {@code
> >> read_uncommitted} isolation level, the end
> >> offset is the high watermark (that is, the offset of the last
> >> successfully replicated message plus one). For
> >> {@code read_committed} consumers, the end offset is the last stable
> >> offset (LSO), which is the minimum of
> >> the high watermark and the smallest offset of any open transaction.
> >> Finally, if the partition has never been
> >> written to, the end offset is 0.
> >
> > Thus, I actually believe that it would be ok to change the
> > implementation and serve the answer from the `TopicPartitionState`?
> >
> > Another idea would be, to use `currentLag()` in combination with
> > `position()` (or the offset of the last read record) to compute the
> > end-offset of the fly?
> >
> >
> > -Matthias
> >
> > On 10/20/23 4:00 AM, Bruno Cadonna wrote:
> >> Hi,
> >>
> >> Matthias is correct that the end offsets are stored somewhere in the
> >> metadata of the consumer. More precisely, they are stored in the
> >> `TopicPartitionState`. However, I could not find public API on the
> >> consumer other than currentLag() that uses the stored end offsets. If
> >> I understand the code correctly, method endOffSets() always triggers a
> >> remote call.
> >>
> >> I am a bit concerned about doing remote calls every commit.interval.ms
> >> (by default 200ms under EOS). At the moment the remote calls are only
> >> issued if an optimization for KTables is turned on where changelog
> >> topics are replaced with the input topic of the KTable. The current
> >> remote calls retrieve all committed offsets of the group at once. If I
> >> understand correctly, that is one single remote call. Remote calls for
> >> getting end offsets of changelog topics -- as I understand you are
> >> planning to issue -- will probably result in multiple remote calls to
> >> multiple leaders of the changelog topic partitions.
> >>
> >> Please correct me if I misunderstood anything of the above.
> >>
> >> If my understanding is correct, I propose to modify the consumer in
> >> such a way to get the end offset from the locally stored metadata
> >> whenever possible as part of the implementation of this KIP. I do not
> >> know what the implications are of such a change of the consumer and if
> >> a KIP is needed for it. Maybe, endOffsets() guarantees to return the
> >> freshest end offsets possible, which would not be satisfied with the
> >> modification.
> >>
> >> Regarding the naming, I do not completely agree with Matthias. While
> >> the pattern might be consistent with onBatchUpdated, what is the
> >> meaning of onBatchUpdated? Is the batch updated? The names
> >> onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO.
> >> With "restore" the pattern works better. If I restore a batch of
> >> records in a state, the records are not there although they should be
> >> there and I add them. If I update a batch of records in a state. This
> >> sounds like the batch of records is in the state and I modify the
> >> existing records within the state. That is clearly not the meaning of
> >> the event for which the listener should be called.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 10/19/23 2:12 AM, Matthias J. Sax wrote:
> >>> Thanks for the KIP. Seems I am almost late to the party.
> >>>
> >>> About naming (fun, fun, fun): I like the current proposal overall,
> >>> except `onBachLoaded`, but would prefer `onBatchUpdated`. It better
> >>> aligns to everything else:
> >>>
> >>>   - it's an update-listener, not loaded-listener
> >>>   - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`,
> >>> `onRestoreSuspended, and `onBachRestored` (it's very consistent
> >>>   - `StandbyUpdateListener` should have `onUpdateStart`,
> >>> `onUpdateSuspended` and `onBatchUpdated`  to be equally consistent
> >>> (using "loaded" breaks the pattern)
> >>>
> >>>
> >>> About the end-offset question: I am relatively sure that the consumer
> >>> gets the latest end-offset as attached metadata in every fetch
> >>> response. (We exploit this behavior to track end-offsets for input
> >>> topic with regard to `max.task.idle.ms` without overhead -- it was
> >>> also a concern when we did the corresponding KIP how we could track
> >>> lag with no overhead).
> >>>
> >>> Thus, I believe we would "just" need to modify the code accordingly
> >>> to get this information from the restore-consumer
> >>> (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from
> >>> internal metadata cache) for free, and pass into the listener.
> >>>
> >>> Please double check / verify this claim and keep me honest about it.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 10/17/23 6:38 AM, Eduwer Camacaro wrote:
> >>>> Hi Bruno,
> >>>>
> >>>> Thanks for your observation; surely it will require a network call
> >>>> using
> >>>> the admin client in order to know this "endOffset" and that will
> >>>> have an
> >>>> impact on performance. We can either find a solution that has a low
> >>>> impact
> >>>> on performance or ideally zero impact; unfortunately, I don't see a
> >>>> way to
> >>>> have zero impact on performance. However, we can leverage the existing
> >>>> #maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the
> >>>> admin
> >>>> client to ask for these "endOffset"s. As far I can understand, this
> >>>> update
> >>>> is done periodically using the "commit.interval.ms" configuration. I
> >>>> believe this option will force us to invoke StandbyUpdateLister once
> >>>> this
> >>>> interval is reached.
> >>>>
> >>>> On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks for the KIP, Colt and Eduwer,
> >>>>>
> >>>>> Are you sure there is also not a significant performance impact for
> >>>>> passing into the callback `currentEndOffset`?
> >>>>>
> >>>>> I am asking because the comment here:
> >>>>>
> >>>>>
> https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129
> >>>>>
> >>>>> says that the end-offset is only updated once for standby tasks whose
> >>>>> changelog topic is not piggy-backed on input topics. I could also not
> >>>>> find the update of end-offset for those standbys.
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>> On 10/16/23 10:55 AM, Lucas Brutschy wrote:
> >>>>>> Hi all,
> >>>>>>
> >>>>>> it's a nice improvement! I don't have anything to add on top of the
> >>>>>> previous comments, just came here to say that it seems to me
> >>>>>> consensus
> >>>>>> has been reached and the result looks good to me.
> >>>>>>
> >>>>>> Thanks Colt and Eduwer!
> >>>>>> Lucas
> >>>>>>
> >>>>>> On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <c...@littlehorse.io>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Thanks, Guozhang. I've updated the KIP and will start a vote.
> >>>>>>>
> >>>>>>> Colt McNealy
> >>>>>>>
> >>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
> >>>>> guozhang.wang...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the summary, that looks good to me.
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io
> >
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hello there!
> >>>>>>>>>
> >>>>>>>>> Thanks everyone for the comments. There's a lot of back-and-forth
> >>>>> going
> >>>>>>>> on,
> >>>>>>>>> so I'll do my best to summarize what everyone's said in TLDR
> >>>>>>>>> format:
> >>>>>>>>>
> >>>>>>>>> 1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
> >>>>>>>> similarly
> >>>>>>>>> for the other methods.
> >>>>>>>>> 2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
> >>>>>>>>> 3. Remove the `earliestOffset` parameter for performance reasons.
> >>>>>>>>>
> >>>>>>>>> If that's all fine with everyone, I'll update the KIP and
> we—well,
> >>>>> mostly
> >>>>>>>>> Edu (:  —will open a PR.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Colt McNealy
> >>>>>>>>>
> >>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
> >>>>> edu...@littlehorse.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello everyone,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for all your feedback for this KIP!
> >>>>>>>>>>
> >>>>>>>>>> I think that the key to choosing proper names for this API is
> >>>>>>>> understanding
> >>>>>>>>>> the terms used inside the StoreChangelogReader. Currently,
> >>>>>>>>>> this class
> >>>>>>>> has
> >>>>>>>>>> two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In
> my
> >>>>>>>> opinion,
> >>>>>>>>>> using StandbyUpdateListener for the interface fits better on
> >>>>>>>>>> these
> >>>>>>>> terms.
> >>>>>>>>>> Same applies for onUpdateStart/Suspended.
> >>>>>>>>>>
> >>>>>>>>>> StoreChangelogReader uses "the same mechanism" for active task
> >>>>>>>> restoration
> >>>>>>>>>> and standby task updates, but this is an implementation
> >>>>>>>>>> detail. Under
> >>>>>>>>>> normal circumstances (no rebalances or task migrations), the
> >>>>> changelog
> >>>>>>>>>> reader will be in STANDBY_UPDATING, which means it will be
> >>>>>>>>>> updating
> >>>>>>>> standby
> >>>>>>>>>> tasks as long as there are new records in the changelog topic.
> >>>>>>>>>> That's
> >>>>>>>> why I
> >>>>>>>>>> prefer onStandbyUpdated instead of onBatchUpdated, even if it
> >>>>>>>>>> doesn't
> >>>>>>>> 100%
> >>>>>>>>>> align with StateRestoreListener, but either one is fine.
> >>>>>>>>>>
> >>>>>>>>>> Edu
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
> >>>>>>>> guozhang.wang...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hello Colt,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for writing the KIP! I have read through the updated
> >>>>>>>>>>> KIP and
> >>>>>>>>>>> overall it looks great. I only have minor naming comments
> (well,
> >>>>>>>>>>> aren't naming the least boring stuff to discuss and that
> >>>>>>>>>>> takes the
> >>>>>>>>>>> most of the time for KIPs :P):
> >>>>>>>>>>>
> >>>>>>>>>>> 1. I tend to agree with Sophie regarding whether or not to
> >>>>>>>>>>> include
> >>>>>>>>>>> "Standby" in the functions of "onStandbyUpdateStart/Suspended",
> >>>>> since
> >>>>>>>>>>> it is also more consistent with the functions of
> >>>>>>>>>>> "StateRestoreListener" where we do not name it as
> >>>>>>>>>>> "onStateRestoreState" etc.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. I know in community discussions we sometimes say "a
> >>>>>>>>>>> standby is
> >>>>>>>>>>> promoted to active", but in the official code / java docs we
> >>>>>>>>>>> did not
> >>>>>>>>>>> have a term of "promotion", since what the code does is really
> >>>>>>>> recycle
> >>>>>>>>>>> the task (while keeping its state stores open), and create a
> new
> >>>>>>>>>>> active task that takes in the recycled state stores and just
> >>>>> changing
> >>>>>>>>>>> the other fields like task type etc. After thinking about
> >>>>>>>>>>> this for a
> >>>>>>>>>>> bit, I tend to feel that "promoted" is indeed a better name
> >>>>>>>>>>> for user
> >>>>>>>>>>> facing purposes while "recycle" is more of a technical detail
> >>>>>>>>>>> inside
> >>>>>>>>>>> the code and could be abstracted away from users. So I feel
> >>>>>>>>>>> keeping
> >>>>>>>>>>> the name "PROMOTED" is fine.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Regarding "earliestOffset", it does feel like we cannot
> >>>>>>>>>>> always
> >>>>>>>>>>> avoid another call to the Kafka API. And on the other hand, I
> >>>>>>>>>>> also
> >>>>>>>>>>> tend to think that such bookkeeping may be better done at the
> >>>>>>>>>>> app
> >>>>>>>>>>> level than from the Streams' public API level. I.e. the app
> >>>>>>>>>>> could
> >>>>>>>> keep
> >>>>>>>>>>> a "first ever starting offset" per "topic-partition-store"
> >>>>>>>>>>> key, and
> >>>>> a
> >>>>>>>>>>> when we have rolling restart and hence some standby task keeps
> >>>>>>>>>>> "jumping" from one client to another via task assignment, the
> >>>>>>>>>>> app
> >>>>>>>>>>> would update this value just one when it finds the
> >>>>>>>>>>> ""topic-partition-store" was never triggered before. What do
> you
> >>>>>>>>>>> think?
> >>>>>>>>>>>
> >>>>>>>>>>> 4. I do not have a strong opinion either, but what about
> >>>>>>>>>> "onBatchUpdated" ?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy
> >>>>>>>>>>> <c...@littlehorse.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Sohpie—
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you very much for such a detailed review of the KIP.
> >>>>>>>>>>>> It might
> >>>>>>>>>>>> actually be longer than the original KIP in the first place!
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Ack'ed and fixed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. Correct, this is a confusing passage and requires context:
> >>>>>>>>>>>>
> >>>>>>>>>>>> One thing on our list of TODO's regarding reliability is to
> >>>>>>>> determine
> >>>>>>>>>> how
> >>>>>>>>>>>> to configure `session.timeout.ms`. In our Kubernetes
> >>>>>>>>>>>> Environment,
> >>>>>>>> an
> >>>>>>>>>>>> instance of our Streams App can be terminated, restarted,
> >>>>>>>>>>>> and get
> >>>>>>>> back
> >>>>>>>>>>> into
> >>>>>>>>>>>> the "RUNNING" Streams state in about 20 seconds. We have two
> >>>>>>>> options
> >>>>>>>>>>> here:
> >>>>>>>>>>>> a) set session.timeout.ms to 30 seconds or so, and deal with
> 20
> >>>>>>>>>> seconds
> >>>>>>>>>>> of
> >>>>>>>>>>>> unavailability for affected partitions, but avoid shuffling
> >>>>>>>>>>>> Tasks;
> >>>>>>>> or
> >>>>>>>>>> b)
> >>>>>>>>>>>> set session.timeout.ms to a low value, such as 6 seconds (
> >>>>>>>>>>>> heartbeat.interval.ms of 2000), and reduce the unavailability
> >>>>>>>> window
> >>>>>>>>>>> during
> >>>>>>>>>>>> a rolling bounce but incur an "extra" rebalance. There are
> >>>>>>>>>>>> several
> >>>>>>>>>>>> different costs to a rebalance, including the shuffling of
> >>>>>>>>>>>> standby
> >>>>>>>>>> tasks.
> >>>>>>>>>>>> JMX metrics are not fine-grained enough to give us an accurate
> >>>>>>>> picture
> >>>>>>>>>> of
> >>>>>>>>>>>> what's going on with the whole Standby Task Shuffle Dance. I
> >>>>>>>>>> hypothesize
> >>>>>>>>>>>> that the Standby Update Listener might help us clarify just
> >>>>>>>>>>>> how the
> >>>>>>>>>>>> shuffling actually (not theoretically) works, which will
> >>>>>>>>>>>> help us
> >>>>>>>> make a
> >>>>>>>>>>>> more informed decision about the session timeout config.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you think this is worth putting in the KIP, I'll polish
> >>>>>>>>>>>> it and
> >>>>>>>> do
> >>>>>>>>>> so;
> >>>>>>>>>>>> else, I'll remove the current half-baked explanation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3. Overall, I agree with this. In our app, each Task has
> >>>>>>>>>>>> only one
> >>>>>>>> Store
> >>>>>>>>>>> to
> >>>>>>>>>>>> reduce the number of changelog partitions, so I sometimes
> >>>>>>>>>>>> forget
> >>>>>>>> the
> >>>>>>>>>>>> distinction between the two concepts, as reflected in the
> >>>>>>>>>>>> KIP (:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3a. I don't like the word "Restore" here, since Restoration
> >>>>>>>>>>>> refers
> >>>>>>>> to
> >>>>>>>>>> an
> >>>>>>>>>>>> Active Task getting caught up in preparation to resume
> >>>>>>>>>>>> processing.
> >>>>>>>>>>>> `StandbyUpdateListener` is fine by me; I have updated the
> >>>>>>>>>>>> KIP. I
> >>>>>>>> am a
> >>>>>>>>>>>> native Python speaker so I do prefer shorter names anyways (:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3b1. +1 to removing the word 'Task'.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3b2. I like `onUpdateStart()`, but with your permission I'd
> >>>>>>>>>>>> prefer
> >>>>>>>>>>>> `onStandbyUpdateStart()` which matches the name of the
> >>>>>>>>>>>> Interface
> >>>>>>>>>>>> "StandbyUpdateListener". (the python part of me hates this,
> >>>>>>>> however)
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3b3. Going back to question 2), `earliestOffset` was
> >>>>>>>>>>>> intended to
> >>>>>>>> allow
> >>>>>>>>>> us
> >>>>>>>>>>>> to more easily calculate the amount of state _already
> >>>>>>>>>>>> loaded_ in
> >>>>>>>> the
> >>>>>>>>>>> store
> >>>>>>>>>>>> by subtracting (startingOffset - earliestOffset). This would
> >>>>>>>>>>>> help
> >>>>>>>> us
> >>>>>>>>>> see
> >>>>>>>>>>>> how much inefficiency is introduced in a rolling restart—if
> >>>>>>>>>>>> we end
> >>>>>>>> up
> >>>>>>>>>>> going
> >>>>>>>>>>>> from a situation with an up-to-date standby before the
> >>>>>>>>>>>> restart, and
> >>>>>>>>>> then
> >>>>>>>>>>>> after the whole restart, the Task is shuffled onto an instance
> >>>>>>>> where
> >>>>>>>>>>> there
> >>>>>>>>>>>> is no previous state, then that is expensive. However, if
> >>>>>>>>>>>> the final
> >>>>>>>>>>>> shuffling results in the Task back on an instance with a lot
> of
> >>>>>>>>>> pre-built
> >>>>>>>>>>>> state, it's not expensive.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If a call over the network is required to determine the
> >>>>>>>> earliestOffset,
> >>>>>>>>>>>> then this is a "hard no-go" for me, and we will remove it
> (I'll
> >>>>>>>> have to
> >>>>>>>>>>>> check with Eduwer as he is close to having a working
> >>>>>>>> implementation). I
> >>>>>>>>>>>> think we can probably determine what we wanted to see in a
> >>>>>>>> different
> >>>>>>>>>>>> way, but it will take more thinking.. If `earliestOffset` is
> >>>>>>>> confusing,
> >>>>>>>>>>>> perhaps rename it to `earliestChangelogOffset`?
> >>>>>>>>>>>>
> >>>>>>>>>>>> `startingOffset` is easy to remove as it can be determined
> >>>>>>>>>>>> from the
> >>>>>>>>>> first
> >>>>>>>>>>>> call to `onBatch{Restored/Updated/Processed/Loaded}()`.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Anyways, I've updated the JavaDoc in the interface;
> >>>>>>>>>>>> hopefully it's
> >>>>>>>> more
> >>>>>>>>>>>> clear. Awaiting further instructions here.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3c. Good point; after thinking, my preference is
> >>>>>>>> `onBatchLoaded()`  ->
> >>>>>>>>>>>> `onBatchUpdated()` -> `onBatchProcessed()` ->
> >>>>>>>>>>>> `onBatchRestored()`.
> >>>>>>>> I am
> >>>>>>>>>>>> less fond of "processed" because when I was first learning
> >>>>>>>>>>>> Streams
> >>>>>>>> I
> >>>>>>>>>>>> mistakenly thought that standby tasks actually processed the
> >>>>>>>>>>>> input
> >>>>>>>>>> topic
> >>>>>>>>>>>> rather than loaded from the changelog. I'll defer to you here.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3d. +1 to `onUpdateSuspended()`, or better yet
> >>>>>>>>>>>> `onStandbyUpdateSuspended()`. Will check about the
> >>>>>>>>>>>> implementation
> >>>>>>>> of
> >>>>>>>>>>>> keeping track of the number of records loaded.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4a. I think this might be best in a separate KIP, especially
> >>>>>>>>>>>> given
> >>>>>>>> that
> >>>>>>>>>>>> this is my and Eduwer's first time contributing to Kafka (so
> we
> >>>>>>>> want to
> >>>>>>>>>>>> minimize the blast radius).
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4b. I might respectfully (and timidly) push back here,
> RECYCLED
> >>>>>>>> for an
> >>>>>>>>>>>> Active Task is a bit confusing to me. DEMOTED and MIGRATED
> make
> >>>>>>>> sense
> >>>>>>>>>>> from
> >>>>>>>>>>>> the standpoint of an Active Task, recycling to me sounds like
> >>>>>>>> throwing
> >>>>>>>>>>>> stuff away, such that the resources (i.e. disk space) can be
> >>>>>>>>>>>> used
> >>>>>>>> by a
> >>>>>>>>>>>> separate Task. As an alternative rather than trying to reuse
> >>>>>>>>>>>> the
> >>>>>>>> same
> >>>>>>>>>>> enum,
> >>>>>>>>>>>> maybe rename it to `StandbySuspendReason` to avoid naming
> >>>>>>>>>>>> conflicts
> >>>>>>>>>> with
> >>>>>>>>>>>> `ActiveSuspendReason`? However, I could be convinced to rename
> >>>>>>>> PROMOTED
> >>>>>>>>>>> ->
> >>>>>>>>>>>> RECYCLED, especially if Eduwer agrees.
> >>>>>>>>>>>>
> >>>>>>>>>>>> TLDR:
> >>>>>>>>>>>>
> >>>>>>>>>>>> T1. Agreed, will remove the word "Task" as it's incorrect.
> >>>>>>>>>>>> T2. Will update to `onStandbyUpdateStart()`
> >>>>>>>>>>>> T3. Awaiting further instructions on earliestOffset and
> >>>>>>>> startingOffset.
> >>>>>>>>>>>> T4. I don't like `onBatchProcessed()` too much, perhaps
> >>>>>>>>>>> `onBatchLoaded()`?
> >>>>>>>>>>>> T5. Will update to `onStandbyUpdateSuspended()`
> >>>>>>>>>>>> T6. Thoughts on renaming SuspendReason to
> StandbySuspendReason,
> >>>>>>>> rather
> >>>>>>>>>>> than
> >>>>>>>>>>>> renaming PROMOTED to RECYCLED? @Eduwer?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Long Live the Otter,
> >>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>
> >>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
> >>>>>>>>>>> sop...@responsive.dev>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hey Colt! Thanks for the KIP -- this will be a great
> >>>>>>>>>>>>> addition to
> >>>>>>>>>>> Streams, I
> >>>>>>>>>>>>> can't believe we've gone so long without this.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Overall the proposal makes sense, but I had a handful of
> >>>>>>>>>>>>> fairly
> >>>>>>>> minor
> >>>>>>>>>>>>> questions and suggestions/requests
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. Seems like the last sentence in the 2nd paragraph of the
> >>>>>>>>>> Motivation
> >>>>>>>>>>>>> section is cut off and incomplete -- "want to be able to
> >>>>>>>>>>>>> know "
> >>>>>>>> what
> >>>>>>>>>>>>> exactly?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. This isn't that important since the motivation as a
> >>>>>>>>>>>>> whole is
> >>>>>>>> clear
> >>>>>>>>>>> to me
> >>>>>>>>>>>>> and convincing enough, but I'm not quite sure I understand
> the
> >>>>>>>>>> example
> >>>>>>>>>>> at
> >>>>>>>>>>>>> the end of the Motivation section. How are standby tasks
> >>>>>>>>>>>>> (and the
> >>>>>>>>>>> ability
> >>>>>>>>>>>>> to hook into and monitor their status) related to the
> >>>>>>>>>>> session.timeout.ms
> >>>>>>>>>>>>> config?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. To help both old and new users of Kafka Streams understand
> >>>>>>>> this
> >>>>>>>>>> new
> >>>>>>>>>>>>> restore listener and its purpose/semantics, can we try to
> name
> >>>>>>>> the
> >>>>>>>>>>> class
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>    callbacks in a way that's more consistent with the
> >>>>>>>>>>>>> active task
> >>>>>>>>>> restore
> >>>>>>>>>>>>> listener?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3a. StandbyTaskUpdateListener:
> >>>>>>>>>>>>> The existing restore listener is called
> >>>>>>>>>>>>> StateRestoreListener, so
> >>>>>>>> the
> >>>>>>>>>>> new
> >>>>>>>>>>>>> one could be called something like
> >>>>>>>>>>>>> StandbyStateRestoreListener.
> >>>>>>>>>>> Although
> >>>>>>>>>>>>> we typically refer to standby tasks as "processing" rather
> >>>>>>>>>>>>> than
> >>>>>>>>>>> "restoring"
> >>>>>>>>>>>>> records -- ie restoration is a term for active task state
> >>>>>>>>>>> specifically. I
> >>>>>>>>>>>>> actually
> >>>>>>>>>>>>> like the original suggestion if we just drop the "Task"
> >>>>>>>>>>>>> part of
> >>>>>>>> the
> >>>>>>>>>>> name,
> >>>>>>>>>>>>> ie StandbyUpdateListener. I think either that or
> >>>>>>>>>> StandbyRestoreListener
> >>>>>>>>>>>>> would be fine and probably the two best options.
> >>>>>>>>>>>>> Also, this probably goes without saying but any change to the
> >>>>>>>> name of
> >>>>>>>>>>> this
> >>>>>>>>>>>>> class should of course be reflected in the
> KafkaStreams#setXXX
> >>>>>>>> API as
> >>>>>>>>>>> well
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3b. #onTaskCreated
> >>>>>>>>>>>>>    I know the "start" callback feels a bit different for the
> >>>>>>>> standby
> >>>>>>>>>> task
> >>>>>>>>>>>>> updater vs an active task beginning restoration, but I
> >>>>>>>>>>>>> think we
> >>>>>>>>>> should
> >>>>>>>>>>> try
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> keep the various callbacks aligned to their active restore
> >>>>>>>> listener
> >>>>>>>>>>>>> counterpart. We can/should just replace the term "restore"
> >>>>>>>>>>>>> with
> >>>>>>>>>>> "update"
> >>>>>>>>>>>>> for the
> >>>>>>>>>>>>> callback method names the same way we do for the class name,
> >>>>>>>> which in
> >>>>>>>>>>> this
> >>>>>>>>>>>>> case would give us #onUpdateStart. Personally I like this
> >>>>>>>>>>>>> better,
> >>>>>>>>>>>>> but it's ultimately up to you. However, I would push back
> >>>>>>>>>>>>> against
> >>>>>>>>>>> anything
> >>>>>>>>>>>>> that includes the word "Task" (eg #onTaskCreated) as the
> >>>>>>>>>>>>> listener
> >>>>>>>>>>>>>    is actually not scoped to the task itself but instead to
> >>>>>>>>>>>>> the
> >>>>>>>>>>> individual
> >>>>>>>>>>>>> state store(s). This is the main reason I would prefer
> >>>>>>>>>>>>> calling it
> >>>>>>>>>>> something
> >>>>>>>>>>>>> like #onUpdateStart, which keeps the focus on the store being
> >>>>>>>> updated
> >>>>>>>>>>>>> rather than the task that just happens to own this store
> >>>>>>>>>>>>> One last thing on this callback -- do we really need both the
> >>>>>>>>>>>>> `earliestOffset` and `startingOffset`? I feel like this
> >>>>>>>>>>>>> might be
> >>>>>>>> more
> >>>>>>>>>>>>> confusing than it
> >>>>>>>>>>>>> is helpful (tbh even I'm not completely sure I know what the
> >>>>>>>>>>> earliestOffset
> >>>>>>>>>>>>> is supposed to represent) More importantly, is this all
> >>>>>>>> information
> >>>>>>>>>>>>> that is already available and able to be passed in to the
> >>>>>>>> callback by
> >>>>>>>>>>>>> Streams? I haven't checked on this but it feels like the
> >>>>>>>>>>> earliestOffset is
> >>>>>>>>>>>>> likely to require a remote call, either by the embedded
> >>>>>>>>>>>>> consumer
> >>>>>>>> or
> >>>>>>>>>>> via the
> >>>>>>>>>>>>> admin client. If so, the ROI on including this parameter
> seems
> >>>>>>>>>>>>> quite low (if not outright negative)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3c. #onBatchRestored
> >>>>>>>>>>>>> If we opt to use the term "update" in place of "restore"
> >>>>>>>> elsewhere,
> >>>>>>>>>>> then we
> >>>>>>>>>>>>> should consider doing so here as well. What do you think
> about
> >>>>>>>>>>>>> #onBatchUpdated, or even #onBatchProcessed?
> >>>>>>>>>>>>> I'm actually not super concerned about this particular API,
> >>>>>>>>>>>>> and
> >>>>>>>>>>> honestly I
> >>>>>>>>>>>>> think we can use restore or update interchangeably here, so
> if
> >>>>>>>> you
> >>>>>>>>>>>>>    don't like any of the suggested names (and no one can
> >>>>>>>>>>>>> think of
> >>>>>>>>>>> anything
> >>>>>>>>>>>>> better), I would just stick with #onBatchRestored. In this
> >>>>>>>>>>>>> case,
> >>>>>>>>>>>>> it kind of makes the most sense.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3d. #onTaskSuspended
> >>>>>>>>>>>>> Along the same lines as 3b above, #onUpdateSuspended or just
> >>>>>>>>>>>>> #onRestoreSuspended probably makes more sense for this
> >>>>>>>>>>>>> callback.
> >>>>>>>>>> Also,
> >>>>>>>>>>>>>    I notice the StateRestoreListener passes in the total
> >>>>>>>>>>>>> number of
> >>>>>>>>>>> records
> >>>>>>>>>>>>> restored to its #onRestoreSuspended. Assuming we already
> track
> >>>>>>>>>>>>> that information in Streams and have it readily available to
> >>>>>>>> pass in
> >>>>>>>>>> at
> >>>>>>>>>>>>> whatever point we would be invoking this callback, that
> >>>>>>>>>>>>> might be
> >>>>>>>> a
> >>>>>>>>>>>>> useful  parameter for the standby listener to have as well
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. I totally love the SuspendReason thing, just two
> >>>>>>>> notes/requests:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4a. Feel free to push back against adding onto the scope of
> >>>>>>>>>>>>> this
> >>>>>>>> KIP,
> >>>>>>>>>>> but
> >>>>>>>>>>>>> it would be great to expand the active state restore listener
> >>>>>>>> with
> >>>>>>>>>> this
> >>>>>>>>>>>>> SuspendReason enum as well. It would be really useful for
> both
> >>>>>>>>>>> variants of
> >>>>>>>>>>>>> restore listener
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED --
> >>>>>>>>>>>>> for
> >>>>>>>>>> standby
> >>>>>>>>>>>>> tasks it means basically the same thing, the point is that
> >>>>>>>>>>>>> active
> >>>>>>>>>>>>> tasks can also be recycled into standbys through the same
> >>>>>>>> mechanism.
> >>>>>>>>>>> This
> >>>>>>>>>>>>> way they can share the SuspendReason enum -- not that it's
> >>>>>>>>>>>>> necessary for them to share, I just think it would be a good
> >>>>>>>> idea to
> >>>>>>>>>>> keep
> >>>>>>>>>>>>> the two restore listeners aligned to the highest degree
> >>>>>>>>>>>>> possible
> >>>>>>>> for
> >>>>>>>>>> as
> >>>>>>>>>>>>> we can.
> >>>>>>>>>>>>> I was actually considering proposing a short KIP with a new
> >>>>>>>>>>>>> RecyclingListener (or something) specifically for this exact
> >>>>>>>> kind of
> >>>>>>>>>>> thing,
> >>>>>>>>>>>>> since we
> >>>>>>>>>>>>> currently have literally zero insight into the recycling
> >>>>>>>>>>>>> process.
> >>>>>>>>>> It's
> >>>>>>>>>>>>> practically impossible to tell when a store has been
> converted
> >>>>>>>> from
> >>>>>>>>>>> active
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> standby, or vice versa. So having access to the
> SuspendReason,
> >>>>>>>> and
> >>>>>>>>>> more
> >>>>>>>>>>>>> importantly having a callback guaranteed to notify you when a
> >>>>>>>>>>>>> state store is recycled whether active or standby, would be
> >>>>>>>> amazing.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> ---------- Forwarded message ---------
> >>>>>>>>>>>>>> From: Colt McNealy <c...@littlehorse.io>
> >>>>>>>>>>>>>> Date: Tue, Oct 3, 2023 at 12:48 PM
> >>>>>>>>>>>>>> Subject: [DISCUSS] KIP-988 Streams Standby Task Update
> >>>>>>>>>>>>>> Listener
> >>>>>>>>>>>>>> To: <dev@kafka.apache.org>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We would like to propose a small KIP to improve the
> >>>>>>>>>>>>>> ability of
> >>>>>>>>>>> Streams
> >>>>>>>>>>>>> apps
> >>>>>>>>>>>>>> to monitor the progress of their standby tasks through a
> >>>>>>>> callback
> >>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We have a nearly-working implementation on our fork and are
> >>>>>>>> curious
> >>>>>>>>>>> for
> >>>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you,
> >>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *Founder, LittleHorse.dev*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>>
>

Reply via email to