Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the "get"-prefix for getters on the public API. Could you please change getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read offsets of tasks the stream thread does not own but that have a state directory on the Streams client by calling StateStore#getCommittedOffsets(). If the thread does not own a task it does also not create any state stores for the task, which means there is no state store on which to call getCommittedOffsets(). I would have rather expected that a checkpoint file is written for all state stores on close -- not only for the RocksDBStore -- and that this checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks that have a state directory on the client but are not currently assigned to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore) *MUST* be available to readers, even after a restart."

This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee that the in-memory structures of the state store (e.g. memtable in the case of RocksDB) are flushed so that the records and the committed offsets are persisted.


4.
The wrapper that provides the legacy checkpointing behavior is actually an implementation detail. I would remove it from the KIP, but still state that the legacy checkpointing behavior will be supported when the state store does not manage the checkpoints.


5.
Regarding the metrics, could you please add the tags, and the recording level (DEBUG or INFO) as done in KIP-607 or KIP-444.


Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:
Hi everyone,

Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP

KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick

Reply via email to