Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-17 Thread Nick Telford
Hi everyone,

As discussed on the Zoom call, we're going to handle rebalance meta-data by:

- On start-up, Streams will open each store and read its changelog offsets
into an in-memory cache. This cache will be shared among all StreamThreads.
- On rebalance, the cache will be consulted for Task offsets for any Task
that is not active on any instance-local StreamThreads. If the Task is
active on *any* instance-local StreamThread, we will report the Task lag as
"up to date" (i.e. -1), because we know that the local state is currently
up-to-date.

We will avoid caching offsets across restarts in the legacy ".checkpoint"
file, so that we can eliminate the logic for handling this class. If
performance of opening/closing many state stores is poor, we can
parallelise it by forking off a thread for each Task directory when reading
the offsets.

I'll update the KIP later today to reflect this design, but I will try to
keep it high-level, so that the exact implementation can vary.

Regards,

Nick

On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman 
wrote:

> 103: I like the idea of immediately deprecating #managesOffsets and aiming
> to make offset management mandatory in the long run. I assume we would also
> log a warning for any custom stores that return "false" from this method to
> encourage custom store implementations to start doing so? My only
> question/concern is that if we want folks to start managing their own
> offsets then we should make this transition easy for them, perhaps by
> exposing some public utility APIs for things that are currently handled by
> Kafka Streams such as reading/writing checkpoint files. Maybe it would be
> useful to include a small example in the KIP of what it would actually mean
> to "manage your own offsets" -- I know (all too well) that plugging in
> custom storage implementations is not easy and most people who do this are
> probably fairly advanced users, but offset management will be a totally new
> ballgame to most people people and this kind of feels like throwing them
> off the deep end. We should at least provide a lifejacket via some kind of
> utility API and/or example
>
> 200. There's been a lot of back and forth on the rebalance metadata/task
> lag computation question, so forgive me if I missed any part of this, but I
> think we've landed at the right idea here. To summarize: the "tl;dr"
> explanation is that we'll write the checkpoint file only on close and will
> account for hard-crash scenarios by opening up the stores on startup and
> writing a checkpoint file for any missing tasks. Does that sound about
> right?
>
> A few clarifications:
> I think we're all more or less on the same page here but just to be
> absolutely clear, the task lags for each task directory found on disk will
> be reported by only one of the StreamThreads, and each StreamThread will
> report lags only for tasks that it already owns or are not assigned to any
> other StreamThread in the client. In other words, we only need to get the
> task lag for completely unassigned/unlocked tasks, which means if there is
> a checkpoint file at all then it must be up-to-date, because there is no
> other StreamThread actively writing to that state store (if so then only
> that StreamThread would report lag for that particular task).
>
> This still leaves the "no checkpoint at all" case which as previously
> mentioned can occur after a  hard-crash. Luckily we only have to worry
> about this once, after starting up again following said hard crash. We can
> simply open up each of the state stores before ever joining the group, get
> the offsets from rocksdb, and write them to a new checkpoint file. After
> that, we can depend on the checkpoints written at close and won't have to
> open up any stores that aren't already assigned for the reasons laid out in
> the paragraph above.
>
> As for the specific mechanism and which thread-does-what, since there were
> some questions, this is how I'm imagining the process:
>
>1.   The general idea is that we simply go through each task directories
>with state but no checkpoint file and open the StateStore, call
>#committedOffset, and then write it to the checkpoint file. We can then
>close these stores and let things proceed as normal.
>2.  This only has to happen once, during startup, but we have two
>options:
>   1. Do this from KafkaStreams#start, ie before we even create the
>   StreamThreads
>   2.  Do this from StreamThread#start, following a similar lock-based
>   approach to the one used #computeTaskLags, where each StreamThread
> just
>   makes a pass over the task directories on disk and attempts to lock
> them
>   one by one. If they obtain the lock, check whether there is state
> but no
>   checkpoint, and write the checkpoint if needed. If it can't grab
> the lock,
>   then we know one of the other StreamThreads must be handling the
> checkpoint
>   file for that task directory, and we can 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-15 Thread Sophie Blee-Goldman
103: I like the idea of immediately deprecating #managesOffsets and aiming
to make offset management mandatory in the long run. I assume we would also
log a warning for any custom stores that return "false" from this method to
encourage custom store implementations to start doing so? My only
question/concern is that if we want folks to start managing their own
offsets then we should make this transition easy for them, perhaps by
exposing some public utility APIs for things that are currently handled by
Kafka Streams such as reading/writing checkpoint files. Maybe it would be
useful to include a small example in the KIP of what it would actually mean
to "manage your own offsets" -- I know (all too well) that plugging in
custom storage implementations is not easy and most people who do this are
probably fairly advanced users, but offset management will be a totally new
ballgame to most people people and this kind of feels like throwing them
off the deep end. We should at least provide a lifejacket via some kind of
utility API and/or example

200. There's been a lot of back and forth on the rebalance metadata/task
lag computation question, so forgive me if I missed any part of this, but I
think we've landed at the right idea here. To summarize: the "tl;dr"
explanation is that we'll write the checkpoint file only on close and will
account for hard-crash scenarios by opening up the stores on startup and
writing a checkpoint file for any missing tasks. Does that sound about
right?

A few clarifications:
I think we're all more or less on the same page here but just to be
absolutely clear, the task lags for each task directory found on disk will
be reported by only one of the StreamThreads, and each StreamThread will
report lags only for tasks that it already owns or are not assigned to any
other StreamThread in the client. In other words, we only need to get the
task lag for completely unassigned/unlocked tasks, which means if there is
a checkpoint file at all then it must be up-to-date, because there is no
other StreamThread actively writing to that state store (if so then only
that StreamThread would report lag for that particular task).

This still leaves the "no checkpoint at all" case which as previously
mentioned can occur after a  hard-crash. Luckily we only have to worry
about this once, after starting up again following said hard crash. We can
simply open up each of the state stores before ever joining the group, get
the offsets from rocksdb, and write them to a new checkpoint file. After
that, we can depend on the checkpoints written at close and won't have to
open up any stores that aren't already assigned for the reasons laid out in
the paragraph above.

As for the specific mechanism and which thread-does-what, since there were
some questions, this is how I'm imagining the process:

   1.   The general idea is that we simply go through each task directories
   with state but no checkpoint file and open the StateStore, call
   #committedOffset, and then write it to the checkpoint file. We can then
   close these stores and let things proceed as normal.
   2.  This only has to happen once, during startup, but we have two
   options:
  1. Do this from KafkaStreams#start, ie before we even create the
  StreamThreads
  2.  Do this from StreamThread#start, following a similar lock-based
  approach to the one used #computeTaskLags, where each StreamThread just
  makes a pass over the task directories on disk and attempts to lock them
  one by one. If they obtain the lock, check whether there is state but no
  checkpoint, and write the checkpoint if needed. If it can't grab
the lock,
  then we know one of the other StreamThreads must be handling the
checkpoint
  file for that task directory, and we can move on.

Don't really feel too strongly about which approach is best,  doing it in
KafkaStreams#start is certainly the most simple while doing it in the
StreamThread's startup is more efficient. If we're worried about adding too
much weight to KafkaStreams#start then the 2nd option is probably best,
though slightly more complicated.

Thoughts?

On Tue, May 14, 2024 at 10:02 AM Nick Telford 
wrote:

> Hi everyone,
>
> Sorry for the delay in replying. I've finally now got some time to work on
> this.
>
> Addressing Matthias's comments:
>
> 100.
> Good point. As Bruno mentioned, there's already AbstractReadWriteDecorator
> which we could leverage to provide that protection. I'll add details on
> this to the KIP.
>
> 101,102.
> It looks like these points have already been addressed by Bruno. Let me
> know if anything here is still unclear or you feel needs to be detailed
> more in the KIP.
>
> 103.
> I'm in favour of anything that gets the old code removed sooner, but
> wouldn't deprecating an API that we expect (some) users to implement cause
> problems?
> I'm thinking about implementers of custom StateStores, as they may be
> confused by managesOffsets() being deprecated, 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-14 Thread Nick Telford
Hi everyone,

Sorry for the delay in replying. I've finally now got some time to work on
this.

Addressing Matthias's comments:

100.
Good point. As Bruno mentioned, there's already AbstractReadWriteDecorator
which we could leverage to provide that protection. I'll add details on
this to the KIP.

101,102.
It looks like these points have already been addressed by Bruno. Let me
know if anything here is still unclear or you feel needs to be detailed
more in the KIP.

103.
I'm in favour of anything that gets the old code removed sooner, but
wouldn't deprecating an API that we expect (some) users to implement cause
problems?
I'm thinking about implementers of custom StateStores, as they may be
confused by managesOffsets() being deprecated, especially since they would
have to mark their implementation as @Deprecated in order to avoid compile
warnings.
If deprecating an API *while it's still expected to be implemented* is
something that's generally done in the project, then I'm happy to do so
here.

104.
I think this is technically possible, but at the cost of considerable
additional code to maintain. Would we ever have a pathway to remove this
downgrade code in the future?


Regarding rebalance metadata:
Opening all stores on start-up to read and cache their offsets is an
interesting idea, especially if we can avoid re-opening the stores once the
Tasks have been assigned. Scalability shouldn't be too much of a problem,
because typically users have a fairly short state.cleanup.delay, so the
number of on-disk Task directories should rarely exceed the number of Tasks
previously assigned to that instance.
An advantage of this approach is that it would also simplify StateStore
implementations, as they would only need to guarantee that committed
offsets are available when the store is open.

I'll investigate this approach this week for feasibility and report back.

I think that covers all the outstanding feedback, unless I missed anything?

Regards,
Nick

On Mon, 6 May 2024 at 14:06, Bruno Cadonna  wrote:

> Hi Matthias,
>
> I see what you mean.
>
> To sum up:
>
> With this KIP the .checkpoint file is written when the store closes.
> That is when:
> 1. a task moves away from Kafka Streams client
> 2. Kafka Streams client shuts down
>
> A Kafka Streams client needs the information in the .checkpoint file
> 1. on startup because it does not have any open stores yet.
> 2. during rebalances for non-empty state directories of tasks that are
> not assigned to the Kafka Streams client.
>
> With hard crashes, i.e., when the Streams client is not able to close
> its state stores and write the .checkpoint file, the .checkpoint file
> might be quite stale. That influences the next rebalance after failover
> negatively.
>
>
> My conclusion is that Kafka Streams either needs to open the state
> stores at start up or we write the checkpoint file more often.
>
> Writing the .checkpoint file during processing more often without
> controlling the flush to disk would work. However, Kafka Streams would
> checkpoint offsets that are not yet persisted on disk by the state
> store. That is with a hard crash the offsets in the .checkpoint file
> might be larger than the offsets checkpointed in the state store. That
> might not be a problem if Kafka Streams uses the .checkpoint file only
> to compute the task lag. The downside is that it makes the managing of
> checkpoints more complex because now we have to maintain two
> checkpoints: one for restoration and one for computing the task lag.
> I think we should explore the option where Kafka Streams opens the state
> stores at start up to get the offsets.
>
> I also checked when Kafka Streams needs the checkpointed offsets to
> compute the task lag during a rebalance. Turns out Kafka Streams needs
> them before sending the join request. Now, I am wondering if opening the
> state stores of unassigned tasks whose state directory exists locally is
> actually such a big issue due to the expected higher latency since it
> happens actually before the Kafka Streams client joins the rebalance.
>
> Best,
> Bruno
>
>
>
>
>
>
>
> On 5/4/24 12:05 AM, Matthias J. Sax wrote:
> > That's good questions... I could think of a few approaches, but I admit
> > it might all be a little bit tricky to code up...
> >
> > However if we don't solve this problem, I think this KIP does not really
> > solve the core issue we are facing? In the end, if we rely on the
> > `.checkpoint` file to compute a task assignment, but the `.checkpoint`
> > file can be arbitrary stale after a crash because we only write it on a
> > clean close, there would be still a huge gap that this KIP does not
> close?
> >
> > For the case in which we keep the checkpoint file, this KIP would still
> > help for "soft errors" in which KS can recover, and roll back the store.
> > A significant win for sure. -- But hard crashes would still be an
> > problem? We might assign tasks to "wrong" instance, ie, which are not
> > most up to date, as the 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-06 Thread Bruno Cadonna

Hi Matthias,

I see what you mean.

To sum up:

With this KIP the .checkpoint file is written when the store closes. 
That is when:

1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down

A Kafka Streams client needs the information in the .checkpoint file
1. on startup because it does not have any open stores yet.
2. during rebalances for non-empty state directories of tasks that are 
not assigned to the Kafka Streams client.


With hard crashes, i.e., when the Streams client is not able to close 
its state stores and write the .checkpoint file, the .checkpoint file 
might be quite stale. That influences the next rebalance after failover 
negatively.



My conclusion is that Kafka Streams either needs to open the state 
stores at start up or we write the checkpoint file more often.


Writing the .checkpoint file during processing more often without 
controlling the flush to disk would work. However, Kafka Streams would 
checkpoint offsets that are not yet persisted on disk by the state 
store. That is with a hard crash the offsets in the .checkpoint file 
might be larger than the offsets checkpointed in the state store. That 
might not be a problem if Kafka Streams uses the .checkpoint file only 
to compute the task lag. The downside is that it makes the managing of 
checkpoints more complex because now we have to maintain two 
checkpoints: one for restoration and one for computing the task lag.
I think we should explore the option where Kafka Streams opens the state 
stores at start up to get the offsets.


I also checked when Kafka Streams needs the checkpointed offsets to 
compute the task lag during a rebalance. Turns out Kafka Streams needs 
them before sending the join request. Now, I am wondering if opening the 
state stores of unassigned tasks whose state directory exists locally is 
actually such a big issue due to the expected higher latency since it 
happens actually before the Kafka Streams client joins the rebalance.


Best,
Bruno







On 5/4/24 12:05 AM, Matthias J. Sax wrote:
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside 
the store (not outside as the changelog offsets) it makes much more 
sense to pull position into RocksDB itself. In the end, it's actually 
a "store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes: For tasks we get assigned, we 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside the 
store (not outside as the changelog offsets) it makes much more sense 
to pull position into RocksDB itself. In the end, it's actually a 
"store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes: For tasks we get assigned, we hand in the already opened 
store (this would amortize the cost to open the store before the 
rebalance) and for non-assigned tasks, we know the offset information 
won't change and we could just cache it in-memory for later reuse (ie, 
next rebalance) and close the store to free up resources? -- Assuming 
that we would get a large percentage of opened stores assigned as 
tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same 
as persisting offset, the position should always be consistent with 
the offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the 
position into the implementation of the StateStore interface since 
the position is updated within the implementation of the StateStore 
interface (e.g. RocksDBStore [1]). My statement describes the 
behavior now, not the change proposed in this KIP, so it does not 
contradict what is stated in the KIP.



200:
This is about Matthias' main concern about 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could 
work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated 
in the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the 
offsets managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position 
is the offset from the upstream source topic, right? -- In the end, 
the position is about IQ, and if we fail to update it, it only means 
that there is some gap when we might not be able to query a standby 
task, because we think it's not up-to-date enough even if it is, 
which would resolve itself soon? Ie, the position might "lag", but 
it's not "inconsistent". Do we believe that this lag 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted 
in the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening 
RocksDB stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we 
need to also let the state store also manage the position otherwise 
they might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent 
offsets and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At 
least Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Matthias J. Sax

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in the .position file when the state store closes. 


This contradicts the KIP:


 these position offsets will be stored in RocksDB, in the same column family as 
the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we need 
to also let the state store also manage the position otherwise they 
might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent offsets 
and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At least 
Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation 
(at least for our own stores, by wrapping them -- we cannot enforce it 
for custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I 
think

it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the 
parameter and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores 
(including custom stores) manage their offsets internally? Maintaining 
both options and thus both code paths puts a burden on everyone and 
make the code messy. I would strongly prefer if we could have mid-term 
path to get rid of supporting both.  -- For this case, we should 
deprecate the newly added `managesOffsets()` method right away, to 
point out that we intend to remove it. If it's mandatory to maintain 
offsets for stores, we won't need this 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Bruno Cadonna

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we need 
to also let the state store also manage the position otherwise they 
might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent offsets 
and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At least 
Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I 
think

it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but 
locally existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that 
it knows where to find the 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-23 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could 
timebox an effort to better understand what would be needed for the 
state store solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology 
is build per stream task. So there is one instance of processor 
topology and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole 
Topology
(in InternalTopologyBuilder), and pass that into 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Bruno Cadonna

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager 
(via

StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

I don't think we need to *require* a constructor accept the TaskId, 
but we

would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm 
assuming,

we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 


wrote:


Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-17 Thread Bruno Cadonna

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153 





On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:


I don't think we need to *require* a constructor accept the TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm assuming,
we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 
Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along the
taskId when creating a new store? Presumably by adding a new version of

the

#get method that takes in a taskId parameter? We can have it default to
invoking the old one for compatibility reasons and it should be

completely

safe to tack on.

Would also prefer the same for a ProcessorSupplier, but that's

definitely

outside the scope of this KIP

On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
wrote:


On further thought, 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Nick Telford
That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

> I don't think we need to *require* a constructor accept the TaskId, but we
> would definitely make sure that the RocksDB state store changes its
> constructor to one that accepts the TaskID (which we can do without
> deprecation since its an internal API), and custom state stores can just
> decide for themselves whether they want to opt-in/use the TaskId param
> or not. I mean custom state stores would have to opt-in anyways by
> implementing the new StoreSupplier#get(TaskId) API and the only
> reason to do that would be to have created a constructor that accepts
> a TaskId
>
> Just to be super clear about the proposal, this is what I had in mind.
> It's actually fairly simple and wouldn't add much to the scope of the
> KIP (I think -- if it turns out to be more complicated than I'm assuming,
> we should definitely do whatever has the smallest LOE to get this done
>
> Anyways, the (only) public API changes would be to add this new
> method to the StoreSupplier API:
>
> default T get(final TaskId taskId) {
> return get();
> }
>
> We can decide whether or not to deprecate the old #get but it's not
> really necessary and might cause a lot of turmoil, so I'd personally
> say we just leave both APIs in place.
>
> And that's it for public API changes! Internally, we would just adapt
> each of the rocksdb StoreSupplier classes to implement this new
> API. So for example with the RocksDBKeyValueBytesStoreSupplier,
> we just add
>
> @Override
> public KeyValueStore get(final TaskId taskId) {
> return returnTimestampedStore ?
> new RocksDBTimestampedStore(name, metricsScope(), taskId) :
> new RocksDBStore(name, metricsScope(), taskId);
> }
>
> And of course add the TaskId parameter to each of the actual
> state store constructors returned here.
>
> Does that make sense? It's entirely possible I'm missing something
> important here, but I think this would be a pretty small addition that
> would solve the problem you mentioned earlier while also being
> useful to anyone who uses custom state stores.
>
> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
> wrote:
>
> > Hi Sophie,
> >
> > Interesting idea! Although what would that mean for the StateStore
> > interface? Obviously we can't require that the constructor take the
> TaskId.
> > Is it enough to add the parameter to the StoreSupplier?
> >
> > Would doing this be in-scope for this KIP, or are we over-complicating
> it?
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman  >
> > wrote:
> >
> > > Somewhat minor point overall, but it actually drives me crazy that you
> > > can't get access to the taskId of a StateStore until #init is called.
> > This
> > > has caused me a huge headache personally (since the same is true for
> > > processors and I was trying to do something that's probably too hacky
> to
> > > actually complain about here lol)
> > >
> > > Can we just change the StateStoreSupplier to receive and pass along the
> > > taskId when creating a new store? Presumably by adding a new version of
> > the
> > > #get method that takes in a taskId parameter? We can have it default to
> > > invoking the old one for compatibility reasons and it should be
> > completely
> > > safe to tack on.
> > >
> > > Would also prefer the same for a ProcessorSupplier, but that's
> definitely
> > > outside the scope of this KIP
> > >
> > > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
> > > wrote:
> > >
> > > > On further thought, it's clear that this can't work for one simple
> > > reason:
> > > > StateStores don't know their associated TaskId (and hence, their
> > > > StateDirectory) until the init() call. Therefore, committedOffset()
> > can't
> > > > be called before init(), unless we also added a StateStoreContext
> > > argument
> > > > to committedOffset(), which I think might be trying to shoehorn too
> > much
> > > > into committedOffset().
> > > >
> > > > I still don't like the idea of the Streams engine maintaining the
> cache
> > > of
> > > > changelog offsets independently of stores, mostly because of the
> > > > maintenance burden of the code duplication, but it looks like we'll
> > have
> > > to
> > > > live with it.
> > > >
> > > > Unless you have any better ideas?
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Wed, 10 Apr 2024 at 14:12, Nick Telford 
> > > wrote:
> > > >
> > > > > Hi Bruno,
> > > > >
> > > > > Immediately after I sent my response, I looked at the 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-16 Thread Sophie Blee-Goldman
I don't think we need to *require* a constructor accept the TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm assuming,
we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
return returnTimestampedStore ?
new RocksDBTimestampedStore(name, metricsScope(), taskId) :
new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:

> Hi Sophie,
>
> Interesting idea! Although what would that mean for the StateStore
> interface? Obviously we can't require that the constructor take the TaskId.
> Is it enough to add the parameter to the StoreSupplier?
>
> Would doing this be in-scope for this KIP, or are we over-complicating it?
>
> Nick
>
> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 
> wrote:
>
> > Somewhat minor point overall, but it actually drives me crazy that you
> > can't get access to the taskId of a StateStore until #init is called.
> This
> > has caused me a huge headache personally (since the same is true for
> > processors and I was trying to do something that's probably too hacky to
> > actually complain about here lol)
> >
> > Can we just change the StateStoreSupplier to receive and pass along the
> > taskId when creating a new store? Presumably by adding a new version of
> the
> > #get method that takes in a taskId parameter? We can have it default to
> > invoking the old one for compatibility reasons and it should be
> completely
> > safe to tack on.
> >
> > Would also prefer the same for a ProcessorSupplier, but that's definitely
> > outside the scope of this KIP
> >
> > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
> > wrote:
> >
> > > On further thought, it's clear that this can't work for one simple
> > reason:
> > > StateStores don't know their associated TaskId (and hence, their
> > > StateDirectory) until the init() call. Therefore, committedOffset()
> can't
> > > be called before init(), unless we also added a StateStoreContext
> > argument
> > > to committedOffset(), which I think might be trying to shoehorn too
> much
> > > into committedOffset().
> > >
> > > I still don't like the idea of the Streams engine maintaining the cache
> > of
> > > changelog offsets independently of stores, mostly because of the
> > > maintenance burden of the code duplication, but it looks like we'll
> have
> > to
> > > live with it.
> > >
> > > Unless you have any better ideas?
> > >
> > > Regards,
> > > Nick
> > >
> > > On Wed, 10 Apr 2024 at 14:12, Nick Telford 
> > wrote:
> > >
> > > > Hi Bruno,
> > > >
> > > > Immediately after I sent my response, I looked at the codebase and
> came
> > > to
> > > > the same conclusion. If it's possible at all, it will need to be done
> > by
> > > > creating temporary StateManagers and StateStores during rebalance. I
> > > think
> > > > it is possible, and probably not too expensive, but the devil will be
> > in
> > > > the detail.
> > > >
> > > > I'll try to find some time to explore the idea to see if it's
> possible
> > > and
> > > > report back, because we'll need to determine this before we can vote
> on
> > > the
> > > > KIP.
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna 
> > wrote:
> > > >
> > > >> Hi Nick,
> > > >>
> > > >> Thanks for reacting on my comments so quickly!
> > > >>
> > > >>
> > > >> 2.
> > > >> Some thoughts on your proposal.
> > > 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-15 Thread Nick Telford
Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the TaskId.
Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating it?

Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 
wrote:

> Somewhat minor point overall, but it actually drives me crazy that you
> can't get access to the taskId of a StateStore until #init is called. This
> has caused me a huge headache personally (since the same is true for
> processors and I was trying to do something that's probably too hacky to
> actually complain about here lol)
>
> Can we just change the StateStoreSupplier to receive and pass along the
> taskId when creating a new store? Presumably by adding a new version of the
> #get method that takes in a taskId parameter? We can have it default to
> invoking the old one for compatibility reasons and it should be completely
> safe to tack on.
>
> Would also prefer the same for a ProcessorSupplier, but that's definitely
> outside the scope of this KIP
>
> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford 
> wrote:
>
> > On further thought, it's clear that this can't work for one simple
> reason:
> > StateStores don't know their associated TaskId (and hence, their
> > StateDirectory) until the init() call. Therefore, committedOffset() can't
> > be called before init(), unless we also added a StateStoreContext
> argument
> > to committedOffset(), which I think might be trying to shoehorn too much
> > into committedOffset().
> >
> > I still don't like the idea of the Streams engine maintaining the cache
> of
> > changelog offsets independently of stores, mostly because of the
> > maintenance burden of the code duplication, but it looks like we'll have
> to
> > live with it.
> >
> > Unless you have any better ideas?
> >
> > Regards,
> > Nick
> >
> > On Wed, 10 Apr 2024 at 14:12, Nick Telford 
> wrote:
> >
> > > Hi Bruno,
> > >
> > > Immediately after I sent my response, I looked at the codebase and came
> > to
> > > the same conclusion. If it's possible at all, it will need to be done
> by
> > > creating temporary StateManagers and StateStores during rebalance. I
> > think
> > > it is possible, and probably not too expensive, but the devil will be
> in
> > > the detail.
> > >
> > > I'll try to find some time to explore the idea to see if it's possible
> > and
> > > report back, because we'll need to determine this before we can vote on
> > the
> > > KIP.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna 
> wrote:
> > >
> > >> Hi Nick,
> > >>
> > >> Thanks for reacting on my comments so quickly!
> > >>
> > >>
> > >> 2.
> > >> Some thoughts on your proposal.
> > >> State managers (and state stores) are parts of tasks. If the task is
> not
> > >> assigned locally, we do not create those tasks. To get the offsets
> with
> > >> your approach, we would need to either create kind of inactive tasks
> > >> besides active and standby tasks or store and manage state managers of
> > >> non-assigned tasks differently than the state managers of assigned
> > >> tasks. Additionally, the cleanup thread that removes unassigned task
> > >> directories needs to concurrently delete those inactive tasks or
> > >> task-less state managers of unassigned tasks. This seems all quite
> messy
> > >> to me.
> > >> Could we create those state managers (or state stores) for locally
> > >> existing but unassigned tasks on demand when
> > >> TaskManager#getTaskOffsetSums() is executed? Or have a different
> > >> encapsulation for the unused task directories?
> > >>
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >>
> > >> On 4/10/24 11:31 AM, Nick Telford wrote:
> > >> > Hi Bruno,
> > >> >
> > >> > Thanks for the review!
> > >> >
> > >> > 1, 4, 5.
> > >> > Done
> > >> >
> > >> > 3.
> > >> > You're right. I've removed the offending paragraph. I had originally
> > >> > adapted this from the guarantees outlined in KIP-892. But it's
> > >> difficult to
> > >> > provide these guarantees without the KIP-892 transaction buffers.
> > >> Instead,
> > >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> > >> >
> > >> > 2.
> > >> > Good point! This is the only part of the KIP that was
> (significantly)
> > >> > changed when I extracted it from KIP-892. My prototype currently
> > >> maintains
> > >> > this "cache" of changelog offsets in .checkpoint, but doing so
> becomes
> > >> very
> > >> > messy. My intent with this change was to try to better encapsulate
> > this
> > >> > offset "caching", especially for StateStores that can cheaply
> provide
> > >> the
> > >> > offsets stored directly in them without needing to duplicate them in
> > >> this
> > >> > cache.
> > >> >
> > >> > It's clear some more work is needed here to better encapsulate this.
> > My
> > >> > immediate thought is: what if we construct *but don't initialize*
> the
> > >> > 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Sophie Blee-Goldman
Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called. This
has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky to
actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along the
taskId when creating a new store? Presumably by adding a new version of the
#get method that takes in a taskId parameter? We can have it default to
invoking the old one for compatibility reasons and it should be completely
safe to tack on.

Would also prefer the same for a ProcessorSupplier, but that's definitely
outside the scope of this KIP

On Fri, Apr 12, 2024 at 3:31 AM Nick Telford  wrote:

> On further thought, it's clear that this can't work for one simple reason:
> StateStores don't know their associated TaskId (and hence, their
> StateDirectory) until the init() call. Therefore, committedOffset() can't
> be called before init(), unless we also added a StateStoreContext argument
> to committedOffset(), which I think might be trying to shoehorn too much
> into committedOffset().
>
> I still don't like the idea of the Streams engine maintaining the cache of
> changelog offsets independently of stores, mostly because of the
> maintenance burden of the code duplication, but it looks like we'll have to
> live with it.
>
> Unless you have any better ideas?
>
> Regards,
> Nick
>
> On Wed, 10 Apr 2024 at 14:12, Nick Telford  wrote:
>
> > Hi Bruno,
> >
> > Immediately after I sent my response, I looked at the codebase and came
> to
> > the same conclusion. If it's possible at all, it will need to be done by
> > creating temporary StateManagers and StateStores during rebalance. I
> think
> > it is possible, and probably not too expensive, but the devil will be in
> > the detail.
> >
> > I'll try to find some time to explore the idea to see if it's possible
> and
> > report back, because we'll need to determine this before we can vote on
> the
> > KIP.
> >
> > Regards,
> > Nick
> >
> > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for reacting on my comments so quickly!
> >>
> >>
> >> 2.
> >> Some thoughts on your proposal.
> >> State managers (and state stores) are parts of tasks. If the task is not
> >> assigned locally, we do not create those tasks. To get the offsets with
> >> your approach, we would need to either create kind of inactive tasks
> >> besides active and standby tasks or store and manage state managers of
> >> non-assigned tasks differently than the state managers of assigned
> >> tasks. Additionally, the cleanup thread that removes unassigned task
> >> directories needs to concurrently delete those inactive tasks or
> >> task-less state managers of unassigned tasks. This seems all quite messy
> >> to me.
> >> Could we create those state managers (or state stores) for locally
> >> existing but unassigned tasks on demand when
> >> TaskManager#getTaskOffsetSums() is executed? Or have a different
> >> encapsulation for the unused task directories?
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 4/10/24 11:31 AM, Nick Telford wrote:
> >> > Hi Bruno,
> >> >
> >> > Thanks for the review!
> >> >
> >> > 1, 4, 5.
> >> > Done
> >> >
> >> > 3.
> >> > You're right. I've removed the offending paragraph. I had originally
> >> > adapted this from the guarantees outlined in KIP-892. But it's
> >> difficult to
> >> > provide these guarantees without the KIP-892 transaction buffers.
> >> Instead,
> >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> >> >
> >> > 2.
> >> > Good point! This is the only part of the KIP that was (significantly)
> >> > changed when I extracted it from KIP-892. My prototype currently
> >> maintains
> >> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
> >> very
> >> > messy. My intent with this change was to try to better encapsulate
> this
> >> > offset "caching", especially for StateStores that can cheaply provide
> >> the
> >> > offsets stored directly in them without needing to duplicate them in
> >> this
> >> > cache.
> >> >
> >> > It's clear some more work is needed here to better encapsulate this.
> My
> >> > immediate thought is: what if we construct *but don't initialize* the
> >> > StateManager and StateStores for every Task directory on-disk? That
> >> should
> >> > still be quite cheap to do, and would enable us to query the offsets
> for
> >> > all on-disk stores, even if they're not open. If the StateManager
> (aka.
> >> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
> >> open
> >> > for closed stores, we could always have a "StubStateManager" in its
> >> place,
> >> > that enables the querying of offsets, but nothing else?
> >> >
> >> > IDK, what do you think?
> >> >
> >> > Regards,
> >> >
> >> > Nick
> >> >
> >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna 
> 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-12 Thread Nick Telford
On further thought, it's clear that this can't work for one simple reason:
StateStores don't know their associated TaskId (and hence, their
StateDirectory) until the init() call. Therefore, committedOffset() can't
be called before init(), unless we also added a StateStoreContext argument
to committedOffset(), which I think might be trying to shoehorn too much
into committedOffset().

I still don't like the idea of the Streams engine maintaining the cache of
changelog offsets independently of stores, mostly because of the
maintenance burden of the code duplication, but it looks like we'll have to
live with it.

Unless you have any better ideas?

Regards,
Nick

On Wed, 10 Apr 2024 at 14:12, Nick Telford  wrote:

> Hi Bruno,
>
> Immediately after I sent my response, I looked at the codebase and came to
> the same conclusion. If it's possible at all, it will need to be done by
> creating temporary StateManagers and StateStores during rebalance. I think
> it is possible, and probably not too expensive, but the devil will be in
> the detail.
>
> I'll try to find some time to explore the idea to see if it's possible and
> report back, because we'll need to determine this before we can vote on the
> KIP.
>
> Regards,
> Nick
>
> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:
>
>> Hi Nick,
>>
>> Thanks for reacting on my comments so quickly!
>>
>>
>> 2.
>> Some thoughts on your proposal.
>> State managers (and state stores) are parts of tasks. If the task is not
>> assigned locally, we do not create those tasks. To get the offsets with
>> your approach, we would need to either create kind of inactive tasks
>> besides active and standby tasks or store and manage state managers of
>> non-assigned tasks differently than the state managers of assigned
>> tasks. Additionally, the cleanup thread that removes unassigned task
>> directories needs to concurrently delete those inactive tasks or
>> task-less state managers of unassigned tasks. This seems all quite messy
>> to me.
>> Could we create those state managers (or state stores) for locally
>> existing but unassigned tasks on demand when
>> TaskManager#getTaskOffsetSums() is executed? Or have a different
>> encapsulation for the unused task directories?
>>
>>
>> Best,
>> Bruno
>>
>>
>>
>> On 4/10/24 11:31 AM, Nick Telford wrote:
>> > Hi Bruno,
>> >
>> > Thanks for the review!
>> >
>> > 1, 4, 5.
>> > Done
>> >
>> > 3.
>> > You're right. I've removed the offending paragraph. I had originally
>> > adapted this from the guarantees outlined in KIP-892. But it's
>> difficult to
>> > provide these guarantees without the KIP-892 transaction buffers.
>> Instead,
>> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
>> >
>> > 2.
>> > Good point! This is the only part of the KIP that was (significantly)
>> > changed when I extracted it from KIP-892. My prototype currently
>> maintains
>> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
>> very
>> > messy. My intent with this change was to try to better encapsulate this
>> > offset "caching", especially for StateStores that can cheaply provide
>> the
>> > offsets stored directly in them without needing to duplicate them in
>> this
>> > cache.
>> >
>> > It's clear some more work is needed here to better encapsulate this. My
>> > immediate thought is: what if we construct *but don't initialize* the
>> > StateManager and StateStores for every Task directory on-disk? That
>> should
>> > still be quite cheap to do, and would enable us to query the offsets for
>> > all on-disk stores, even if they're not open. If the StateManager (aka.
>> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
>> open
>> > for closed stores, we could always have a "StubStateManager" in its
>> place,
>> > that enables the querying of offsets, but nothing else?
>> >
>> > IDK, what do you think?
>> >
>> > Regards,
>> >
>> > Nick
>> >
>> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:
>> >
>> >> Hi Nick,
>> >>
>> >> Thanks for breaking out the KIP from KIP-892!
>> >>
>> >> Here a couple of comments/questions:
>> >>
>> >> 1.
>> >> In Kafka Streams, we have a design guideline which says to not use the
>> >> "get"-prefix for getters on the public API. Could you please change
>> >> getCommittedOffsets() to committedOffsets()?
>> >>
>> >>
>> >> 2.
>> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read
>> >> offsets of tasks the stream thread does not own but that have a state
>> >> directory on the Streams client by calling
>> >> StateStore#getCommittedOffsets(). If the thread does not own a task it
>> >> does also not create any state stores for the task, which means there
>> is
>> >> no state store on which to call getCommittedOffsets().
>> >> I would have rather expected that a checkpoint file is written for all
>> >> state stores on close -- not only for the RocksDBStore -- and that this
>> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the
>> tasks

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno,

Immediately after I sent my response, I looked at the codebase and came to
the same conclusion. If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.

I'll try to find some time to explore the idea to see if it's possible and
report back, because we'll need to determine this before we can vote on the
KIP.

Regards,
Nick

On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna  wrote:

> Hi Nick,
>
> Thanks for reacting on my comments so quickly!
>
>
> 2.
> Some thoughts on your proposal.
> State managers (and state stores) are parts of tasks. If the task is not
> assigned locally, we do not create those tasks. To get the offsets with
> your approach, we would need to either create kind of inactive tasks
> besides active and standby tasks or store and manage state managers of
> non-assigned tasks differently than the state managers of assigned
> tasks. Additionally, the cleanup thread that removes unassigned task
> directories needs to concurrently delete those inactive tasks or
> task-less state managers of unassigned tasks. This seems all quite messy
> to me.
> Could we create those state managers (or state stores) for locally
> existing but unassigned tasks on demand when
> TaskManager#getTaskOffsetSums() is executed? Or have a different
> encapsulation for the unused task directories?
>
>
> Best,
> Bruno
>
>
>
> On 4/10/24 11:31 AM, Nick Telford wrote:
> > Hi Bruno,
> >
> > Thanks for the review!
> >
> > 1, 4, 5.
> > Done
> >
> > 3.
> > You're right. I've removed the offending paragraph. I had originally
> > adapted this from the guarantees outlined in KIP-892. But it's difficult
> to
> > provide these guarantees without the KIP-892 transaction buffers.
> Instead,
> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> >
> > 2.
> > Good point! This is the only part of the KIP that was (significantly)
> > changed when I extracted it from KIP-892. My prototype currently
> maintains
> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
> very
> > messy. My intent with this change was to try to better encapsulate this
> > offset "caching", especially for StateStores that can cheaply provide the
> > offsets stored directly in them without needing to duplicate them in this
> > cache.
> >
> > It's clear some more work is needed here to better encapsulate this. My
> > immediate thought is: what if we construct *but don't initialize* the
> > StateManager and StateStores for every Task directory on-disk? That
> should
> > still be quite cheap to do, and would enable us to query the offsets for
> > all on-disk stores, even if they're not open. If the StateManager (aka.
> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
> open
> > for closed stores, we could always have a "StubStateManager" in its
> place,
> > that enables the querying of offsets, but nothing else?
> >
> > IDK, what do you think?
> >
> > Regards,
> >
> > Nick
> >
> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for breaking out the KIP from KIP-892!
> >>
> >> Here a couple of comments/questions:
> >>
> >> 1.
> >> In Kafka Streams, we have a design guideline which says to not use the
> >> "get"-prefix for getters on the public API. Could you please change
> >> getCommittedOffsets() to committedOffsets()?
> >>
> >>
> >> 2.
> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read
> >> offsets of tasks the stream thread does not own but that have a state
> >> directory on the Streams client by calling
> >> StateStore#getCommittedOffsets(). If the thread does not own a task it
> >> does also not create any state stores for the task, which means there is
> >> no state store on which to call getCommittedOffsets().
> >> I would have rather expected that a checkpoint file is written for all
> >> state stores on close -- not only for the RocksDBStore -- and that this
> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
> >> that have a state directory on the client but are not currently assigned
> >> to any stream thread of the Streams client.
> >>
> >>
> >> 3.
> >> In the javadocs for commit() you write
> >>
> >> "... all writes since the last commit(Map), or since init(StateStore)
> >> *MUST* be available to readers, even after a restart."
> >>
> >> This is only true for a clean close before the restart, isn't it?
> >> If the task fails with a dirty close, Kafka Streams cannot guarantee
> >> that the in-memory structures of the state store (e.g. memtable in the
> >> case of RocksDB) are flushed so that the records and the committed
> >> offsets are persisted.
> >>
> >>
> >> 4.
> >> The wrapper that provides the legacy checkpointing behavior is actually
> >> an implementation detail. I would remove it from the KIP, but still
> >> state that the legacy 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Bruno Cadonna

Hi Nick,

Thanks for reacting on my comments so quickly!


2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task is not 
assigned locally, we do not create those tasks. To get the offsets with 
your approach, we would need to either create kind of inactive tasks 
besides active and standby tasks or store and manage state managers of 
non-assigned tasks differently than the state managers of assigned 
tasks. Additionally, the cleanup thread that removes unassigned task 
directories needs to concurrently delete those inactive tasks or 
task-less state managers of unassigned tasks. This seems all quite messy 
to me.
Could we create those state managers (or state stores) for locally 
existing but unassigned tasks on demand when 
TaskManager#getTaskOffsetSums() is executed? Or have a different 
encapsulation for the unused task directories?



Best,
Bruno



On 4/10/24 11:31 AM, Nick Telford wrote:

Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:


Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read
offsets of tasks the stream thread does not own but that have a state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a task it
does also not create any state stores for the task, which means there is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all
state stores on close -- not only for the RocksDBStore -- and that this
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
that have a state directory on the client but are not currently assigned
to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore)
*MUST* be available to readers, even after a restart."

This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee
that the in-memory structures of the state store (e.g. memtable in the
case of RocksDB) are flushed so that the records and the committed
offsets are persisted.


4.
The wrapper that provides the legacy checkpointing behavior is actually
an implementation detail. I would remove it from the KIP, but still
state that the legacy checkpointing behavior will be supported when the
state store does not manage the checkpoints.


5.
Regarding the metrics, could you please add the tags, and the recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.


Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic

Checkpointing"

section from KIP-892: Transactional Semantics for StateStores, into its

own

KIP

KIP-1035: StateStore managed changelog offsets


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets


While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892,

and

a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Nick Telford
Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:

> Hi Nick,
>
> Thanks for breaking out the KIP from KIP-892!
>
> Here a couple of comments/questions:
>
> 1.
> In Kafka Streams, we have a design guideline which says to not use the
> "get"-prefix for getters on the public API. Could you please change
> getCommittedOffsets() to committedOffsets()?
>
>
> 2.
> It is not clear to me how TaskManager#getTaskOffsetSums() should read
> offsets of tasks the stream thread does not own but that have a state
> directory on the Streams client by calling
> StateStore#getCommittedOffsets(). If the thread does not own a task it
> does also not create any state stores for the task, which means there is
> no state store on which to call getCommittedOffsets().
> I would have rather expected that a checkpoint file is written for all
> state stores on close -- not only for the RocksDBStore -- and that this
> checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
> that have a state directory on the client but are not currently assigned
> to any stream thread of the Streams client.
>
>
> 3.
> In the javadocs for commit() you write
>
> "... all writes since the last commit(Map), or since init(StateStore)
> *MUST* be available to readers, even after a restart."
>
> This is only true for a clean close before the restart, isn't it?
> If the task fails with a dirty close, Kafka Streams cannot guarantee
> that the in-memory structures of the state store (e.g. memtable in the
> case of RocksDB) are flushed so that the records and the committed
> offsets are persisted.
>
>
> 4.
> The wrapper that provides the legacy checkpointing behavior is actually
> an implementation detail. I would remove it from the KIP, but still
> state that the legacy checkpointing behavior will be supported when the
> state store does not manage the checkpoints.
>
>
> 5.
> Regarding the metrics, could you please add the tags, and the recording
> level (DEBUG or INFO) as done in KIP-607 or KIP-444.
>
>
> Best,
> Bruno
>
> On 4/7/24 5:35 PM, Nick Telford wrote:
> > Hi everyone,
> >
> > Based on some offline discussion, I've split out the "Atomic
> Checkpointing"
> > section from KIP-892: Transactional Semantics for StateStores, into its
> own
> > KIP
> >
> > KIP-1035: StateStore managed changelog offsets
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> >
> > While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
> > changes were always the most contentious part, and continued to spur
> > discussion even after KIP-892 was adopted.
> >
> > All the changes introduced in KIP-1035 have been removed from KIP-892,
> and
> > a hard dependency on KIP-1035 has been added to KIP-892 in their place.
> >
> > I'm hopeful that with some more focus on this set of changes, we can
> > deliver something that we're all happy with.
> >
> > Regards,
> > Nick
> >
>


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-09 Thread Bruno Cadonna

Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the 
"get"-prefix for getters on the public API. Could you please change 
getCommittedOffsets() to committedOffsets()?



2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read 
offsets of tasks the stream thread does not own but that have a state 
directory on the Streams client by calling 
StateStore#getCommittedOffsets(). If the thread does not own a task it 
does also not create any state stores for the task, which means there is 
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all 
state stores on close -- not only for the RocksDBStore -- and that this 
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks 
that have a state directory on the client but are not currently assigned 
to any stream thread of the Streams client.



3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore) 
*MUST* be available to readers, even after a restart."


This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee 
that the in-memory structures of the state store (e.g. memtable in the 
case of RocksDB) are flushed so that the records and the committed 
offsets are persisted.



4.
The wrapper that provides the legacy checkpointing behavior is actually 
an implementation detail. I would remove it from the KIP, but still 
state that the legacy checkpointing behavior will be supported when the 
state store does not manage the checkpoints.



5.
Regarding the metrics, could you please add the tags, and the recording 
level (DEBUG or INFO) as done in KIP-607 or KIP-444.



Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP

KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick



[DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-07 Thread Nick Telford
Hi everyone,

Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP

KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick