Hi Nick,
Thanks for breaking out the KIP from KIP-892!
Here a couple of comments/questions:
1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?
2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read
offsets of tasks the stream thread does not own but that have a state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a task it
does also not create any state stores for the task, which means there is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all
state stores on close -- not only for the RocksDBStore -- and that this
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
that have a state directory on the client but are not currently assigned
to any stream thread of the Streams client.
3.
In the javadocs for commit() you write
"... all writes since the last commit(Map), or since init(StateStore)
*MUST* be available to readers, even after a restart."
This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee
that the in-memory structures of the state store (e.g. memtable in the
case of RocksDB) are flushed so that the records and the committed
offsets are persisted.
4.
The wrapper that provides the legacy checkpointing behavior is actually
an implementation detail. I would remove it from the KIP, but still
state that the legacy checkpointing behavior will be supported when the
state store does not manage the checkpoints.
5.
Regarding the metrics, could you please add the tags, and the recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.
Best,
Bruno
On 4/7/24 5:35 PM, Nick Telford wrote:
Hi everyone,
Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP
KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.
All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.
I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.
Regards,
Nick