Hi Nick,

Thanks for reacting on my comments so quickly!


2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task is not assigned locally, we do not create those tasks. To get the offsets with your approach, we would need to either create kind of inactive tasks besides active and standby tasks or store and manage state managers of non-assigned tasks differently than the state managers of assigned tasks. Additionally, the cleanup thread that removes unassigned task directories needs to concurrently delete those inactive tasks or task-less state managers of unassigned tasks. This seems all quite messy to me. Could we create those state managers (or state stores) for locally existing but unassigned tasks on demand when TaskManager#getTaskOffsetSums() is executed? Or have a different encapsulation for the unused task directories?


Best,
Bruno



On 4/10/24 11:31 AM, Nick Telford wrote:
Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna <cado...@apache.org> wrote:

Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read
offsets of tasks the stream thread does not own but that have a state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a task it
does also not create any state stores for the task, which means there is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all
state stores on close -- not only for the RocksDBStore -- and that this
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
that have a state directory on the client but are not currently assigned
to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore)
*MUST* be available to readers, even after a restart."

This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee
that the in-memory structures of the state store (e.g. memtable in the
case of RocksDB) are flushed so that the records and the committed
offsets are persisted.


4.
The wrapper that provides the legacy checkpointing behavior is actually
an implementation detail. I would remove it from the KIP, but still
state that the legacy checkpointing behavior will be supported when the
state store does not manage the checkpoints.


5.
Regarding the metrics, could you please add the tags, and the recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.


Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:
Hi everyone,

Based on some offline discussion, I've split out the "Atomic
Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its
own
KIP

KIP-1035: StateStore managed changelog offsets

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892,
and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick



Reply via email to