Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of end-offset.


-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:
Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything specific about `endOffset` guarantees:

Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
written to, the end offset is 0.

Thus, I actually believe that it would be ok to change the implementation and serve the answer from the `TopicPartitionState`?

Another idea would be, to use `currentLag()` in combination with `position()` (or the offset of the last read record) to compute the end-offset of the fly?


-Matthias

On 10/20/23 4:00 AM, Bruno Cadonna wrote:
Hi,

Matthias is correct that the end offsets are stored somewhere in the metadata of the consumer. More precisely, they are stored in the `TopicPartitionState`. However, I could not find public API on the consumer other than currentLag() that uses the stored end offsets. If I understand the code correctly, method endOffSets() always triggers a remote call.

I am a bit concerned about doing remote calls every commit.interval.ms (by default 200ms under EOS). At the moment the remote calls are only issued if an optimization for KTables is turned on where changelog topics are replaced with the input topic of the KTable. The current remote calls retrieve all committed offsets of the group at once. If I understand correctly, that is one single remote call. Remote calls for getting end offsets of changelog topics -- as I understand you are planning to issue -- will probably result in multiple remote calls to multiple leaders of the changelog topic partitions.

Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such a way to get the end offset from the locally stored metadata whenever possible as part of the implementation of this KIP. I do not know what the implications are of such a change of the consumer and if a KIP is needed for it. Maybe, endOffsets() guarantees to return the freshest end offsets possible, which would not be satisfied with the modification.

Regarding the naming, I do not completely agree with Matthias. While the pattern might be consistent with onBatchUpdated, what is the meaning of onBatchUpdated? Is the batch updated? The names onBatchLoaded or onBatchWritten or onBatchAdded are more clear IMO. With "restore" the pattern works better. If I restore a batch of records in a state, the records are not there although they should be there and I add them. If I update a batch of records in a state. This sounds like the batch of records is in the state and I modify the existing records within the state. That is clearly not the meaning of the event for which the listener should be called.

Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:
Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, except `onBachLoaded`, but would prefer `onBatchUpdated`. It better aligns to everything else:

  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, `onRestoreSuspended, and `onBachRestored` (it's very consistent   - `StandbyUpdateListener` should have `onUpdateStart`, `onUpdateSuspended` and `onBatchUpdated`  to be equally consistent (using "loaded" breaks the pattern)


About the end-offset question: I am relatively sure that the consumer gets the latest end-offset as attached metadata in every fetch response. (We exploit this behavior to track end-offsets for input topic with regard to `max.task.idle.ms` without overhead -- it was also a concern when we did the corresponding KIP how we could track lag with no overhead).

Thus, I believe we would "just" need to modify the code accordingly to get this information from the restore-consumer (`restorConsumer.endOffsets(...)`; should be served w/o RPC but from internal metadata cache) for free, and pass into the listener.

Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:
Hi Bruno,

Thanks for your observation; surely it will require a network call using the admin client in order to know this "endOffset" and that will have an impact on performance. We can either find a solution that has a low impact on performance or ideally zero impact; unfortunately, I don't see a way to
have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin client to ask for these "endOffset"s. As far I can understand, this update
is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna <cado...@apache.org> wrote:

Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:
Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy <c...@littlehorse.io>
wrote:

Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang <
guozhang.wang...@gmail.com>
wrote:

Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy <c...@littlehorse.io>
wrote:

Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth
going
on,
so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do
similarly
for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well,
mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro <
edu...@littlehorse.io>
wrote:

Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is
understanding
the terms used inside the StoreChangelogReader. Currently, this class
has
two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my
opinion,
using StandbyUpdateListener for the interface fits better on these
terms.
Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task
restoration
and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the
changelog
reader will be in STANDBY_UPDATING, which means it will be updating
standby
tasks as long as there are new records in the changelog topic. That's
why I
prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't
100%
align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <
guozhang.wang...@gmail.com>
wrote:

Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and
overall it looks great. I only have minor naming comments (well,
aren't naming the least boring stuff to discuss and that takes the
most of the time for KIPs :P):

1. I tend to agree with Sophie regarding whether or not to include
"Standby" in the functions of "onStandbyUpdateStart/Suspended",
since
it is also more consistent with the functions of
"StateRestoreListener" where we do not name it as
"onStateRestoreState" etc.

2. I know in community discussions we sometimes say "a standby is promoted to active", but in the official code / java docs we did not
have a term of "promotion", since what the code does is really
recycle
the task (while keeping its state stores open), and create a new
active task that takes in the recycled state stores and just
changing
the other fields like task type etc. After thinking about this for a bit, I tend to feel that "promoted" is indeed a better name for user facing purposes while "recycle" is more of a technical detail inside the code and could be abstracted away from users. So I feel keeping
the name "PROMOTED" is fine.

3. Regarding "earliestOffset", it does feel like we cannot always avoid another call to the Kafka API. And on the other hand, I also tend to think that such bookkeeping may be better done at the app level than from the Streams' public API level. I.e. the app could
keep
a "first ever starting offset" per "topic-partition-store" key, and
a
when we have rolling restart and hence some standby task keeps
"jumping" from one client to another via task assignment, the app
would update this value just one when it finds the
""topic-partition-store" was never triggered before. What do you
think?

4. I do not have a strong opinion either, but what about
"onBatchUpdated" ?


Guozhang

On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy <c...@littlehorse.io>
wrote:

Sohpie—

Thank you very much for such a detailed review of the KIP. It might
actually be longer than the original KIP in the first place!

1. Ack'ed and fixed.

2. Correct, this is a confusing passage and requires context:

One thing on our list of TODO's regarding reliability is to
determine
how
to configure `session.timeout.ms`. In our Kubernetes Environment,
an
instance of our Streams App can be terminated, restarted, and get
back
into
the "RUNNING" Streams state in about 20 seconds. We have two
options
here:
a) set session.timeout.ms to 30 seconds or so, and deal with 20
seconds
of
unavailability for affected partitions, but avoid shuffling Tasks;
or
b)
set session.timeout.ms to a low value, such as 6 seconds (
heartbeat.interval.ms of 2000), and reduce the unavailability
window
during
a rolling bounce but incur an "extra" rebalance. There are several different costs to a rebalance, including the shuffling of standby
tasks.
JMX metrics are not fine-grained enough to give us an accurate
picture
of
what's going on with the whole Standby Task Shuffle Dance. I
hypothesize
that the Standby Update Listener might help us clarify just how the shuffling actually (not theoretically) works, which will help us
make a
more informed decision about the session timeout config.

If you think this is worth putting in the KIP, I'll polish it and
do
so;
else, I'll remove the current half-baked explanation.

3. Overall, I agree with this. In our app, each Task has only one
Store
to
reduce the number of changelog partitions, so I sometimes forget
the
distinction between the two concepts, as reflected in the KIP (:

3a. I don't like the word "Restore" here, since Restoration refers
to
an
Active Task getting caught up in preparation to resume processing. `StandbyUpdateListener` is fine by me; I have updated the KIP. I
am a
native Python speaker so I do prefer shorter names anyways (:

3b1. +1 to removing the word 'Task'.

3b2. I like `onUpdateStart()`, but with your permission I'd prefer `onStandbyUpdateStart()` which matches the name of the Interface
"StandbyUpdateListener". (the python part of me hates this,
however)

3b3. Going back to question 2), `earliestOffset` was intended to
allow
us
to more easily calculate the amount of state _already loaded_ in
the
store
by subtracting (startingOffset - earliestOffset). This would help
us
see
how much inefficiency is introduced in a rolling restart—if we end
up
going
from a situation with an up-to-date standby before the restart, and
then
after the whole restart, the Task is shuffled onto an instance
where
there
is no previous state, then that is expensive. However, if the final
shuffling results in the Task back on an instance with a lot of
pre-built
state, it's not expensive.

If a call over the network is required to determine the
earliestOffset,
then this is a "hard no-go" for me, and we will remove it (I'll
have to
check with Eduwer as he is close to having a working
implementation). I
think we can probably determine what we wanted to see in a
different
way, but it will take more thinking.. If `earliestOffset` is
confusing,
perhaps rename it to `earliestChangelogOffset`?

`startingOffset` is easy to remove as it can be determined from the
first
call to `onBatch{Restored/Updated/Processed/Loaded}()`.

Anyways, I've updated the JavaDoc in the interface; hopefully it's
more
clear. Awaiting further instructions here.

3c. Good point; after thinking, my preference is
`onBatchLoaded()`  ->
`onBatchUpdated()` -> `onBatchProcessed()` -> `onBatchRestored()`.
I am
less fond of "processed" because when I was first learning Streams
I
mistakenly thought that standby tasks actually processed the input
topic
rather than loaded from the changelog. I'll defer to you here.

3d. +1 to `onUpdateSuspended()`, or better yet
`onStandbyUpdateSuspended()`. Will check about the implementation
of
keeping track of the number of records loaded.

4a. I think this might be best in a separate KIP, especially given
that
this is my and Eduwer's first time contributing to Kafka (so we
want to
minimize the blast radius).

4b. I might respectfully (and timidly) push back here, RECYCLED
for an
Active Task is a bit confusing to me. DEMOTED and MIGRATED make
sense
from
the standpoint of an Active Task, recycling to me sounds like
throwing
stuff away, such that the resources (i.e. disk space) can be used
by a
separate Task. As an alternative rather than trying to reuse the
same
enum,
maybe rename it to `StandbySuspendReason` to avoid naming conflicts
with
`ActiveSuspendReason`? However, I could be convinced to rename
PROMOTED
->
RECYCLED, especially if Eduwer agrees.

TLDR:

T1. Agreed, will remove the word "Task" as it's incorrect.
T2. Will update to `onStandbyUpdateStart()`
T3. Awaiting further instructions on earliestOffset and
startingOffset.
T4. I don't like `onBatchProcessed()` too much, perhaps
`onBatchLoaded()`?
T5. Will update to `onStandbyUpdateSuspended()`
T6. Thoughts on renaming SuspendReason to StandbySuspendReason,
rather
than
renaming PROMOTED to RECYCLED? @Eduwer?

Long Live the Otter,
Colt McNealy

*Founder, LittleHorse.dev*


On Wed, Oct 11, 2023 at 9:32 AM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:

Hey Colt! Thanks for the KIP -- this will be a great addition to
Streams, I
can't believe we've gone so long without this.

Overall the proposal makes sense, but I had a handful of fairly
minor
questions and suggestions/requests

1. Seems like the last sentence in the 2nd paragraph of the
Motivation
section is cut off and incomplete -- "want to be able to know "
what
exactly?

2. This isn't that important since the motivation as a whole is
clear
to me
and convincing enough, but I'm not quite sure I understand the
example
at
the end of the Motivation section. How are standby tasks (and the
ability
to hook into and monitor their status) related to the
session.timeout.ms
config?

3. To help both old and new users of Kafka Streams understand
this
new
restore listener and its purpose/semantics, can we try to name
the
class
and
   callbacks in a way that's more consistent with the active task
restore
listener?

3a. StandbyTaskUpdateListener:
The existing restore listener is called StateRestoreListener, so
the
new
one could be called something like StandbyStateRestoreListener.
Although
we typically refer to standby tasks as "processing" rather than
"restoring"
records -- ie restoration is a term for active task state
specifically. I
actually
like the original suggestion if we just drop the "Task" part of
the
name,
ie StandbyUpdateListener. I think either that or
StandbyRestoreListener
would be fine and probably the two best options.
Also, this probably goes without saying but any change to the
name of
this
class should of course be reflected in the KafkaStreams#setXXX
API as
well

3b. #onTaskCreated
   I know the "start" callback feels a bit different for the
standby
task
updater vs an active task beginning restoration, but I think we
should
try
to
keep the various callbacks aligned to their active restore
listener
counterpart. We can/should just replace the term "restore" with
"update"
for the
callback method names the same way we do for the class name,
which in
this
case would give us #onUpdateStart. Personally I like this better, but it's ultimately up to you. However, I would push back against
anything
that includes the word "Task" (eg #onTaskCreated) as the listener    is actually not scoped to the task itself but instead to the
individual
state store(s). This is the main reason I would prefer calling it
something
like #onUpdateStart, which keeps the focus on the store being
updated
rather than the task that just happens to own this store
One last thing on this callback -- do we really need both the
`earliestOffset` and `startingOffset`? I feel like this might be
more
confusing than it
is helpful (tbh even I'm not completely sure I know what the
earliestOffset
is supposed to represent) More importantly, is this all
information
that is already available and able to be passed in to the
callback by
Streams? I haven't checked on this but it feels like the
earliestOffset is
likely to require a remote call, either by the embedded consumer
or
via the
admin client. If so, the ROI on including this parameter seems
quite low (if not outright negative)

3c. #onBatchRestored
If we opt to use the term "update" in place of "restore"
elsewhere,
then we
should consider doing so here as well. What do you think about
#onBatchUpdated, or even #onBatchProcessed?
I'm actually not super concerned about this particular API, and
honestly I
think we can use restore or update interchangeably here, so if
you
   don't like any of the suggested names (and no one can think of
anything
better), I would just stick with #onBatchRestored. In this case,
it kind of makes the most sense.

3d. #onTaskSuspended
Along the same lines as 3b above, #onUpdateSuspended or just
#onRestoreSuspended probably makes more sense for this callback.
Also,
   I notice the StateRestoreListener passes in the total number of
records
restored to its #onRestoreSuspended. Assuming we already track
that information in Streams and have it readily available to
pass in
at
whatever point we would be invoking this callback, that might be
a
useful  parameter for the standby listener to have as well

4. I totally love the SuspendReason thing, just two
notes/requests:

4a. Feel free to push back against adding onto the scope of this
KIP,
but
it would be great to expand the active state restore listener
with
this
SuspendReason enum as well. It would be really useful for both
variants of
restore listener

4b. Assuming we do 4a, let's rename PROMOTED to RECYCLED -- for
standby
tasks it means basically the same thing, the point is that active
tasks can also be recycled into standbys through the same
mechanism.
This
way they can share the SuspendReason enum -- not that it's
necessary for them to share, I just think it would be a good
idea to
keep
the two restore listeners aligned to the highest degree possible
for
as
we can.
I was actually considering proposing a short KIP with a new
RecyclingListener (or something) specifically for this exact
kind of
thing,
since we
currently have literally zero insight into the recycling process.
It's
practically impossible to tell when a store has been converted
from
active
to
standby, or vice versa. So having access to the SuspendReason,
and
more
importantly having a callback guaranteed to notify you when a
state store is recycled whether active or standby, would be
amazing.

Thanks for the KIP!

-Sophie "otterStandbyTaskUpdateListener :P" Blee-Goldman


---------- Forwarded message ---------
From: Colt McNealy <c...@littlehorse.io>
Date: Tue, Oct 3, 2023 at 12:48 PM
Subject: [DISCUSS] KIP-988 Streams Standby Task Update Listener
To: <dev@kafka.apache.org>


Hi all,

We would like to propose a small KIP to improve the ability of
Streams
apps
to monitor the progress of their standby tasks through a
callback
interface.

We have a nearly-working implementation on our fork and are
curious
for
feedback.







https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener

Thank you,
Colt McNealy

*Founder, LittleHorse.dev*







Reply via email to