Somewhat minor point overall, but it actually drives me crazy that you can't get access to the taskId of a StateStore until #init is called. This has caused me a huge headache personally (since the same is true for processors and I was trying to do something that's probably too hacky to actually complain about here lol)
Can we just change the StateStoreSupplier to receive and pass along the taskId when creating a new store? Presumably by adding a new version of the #get method that takes in a taskId parameter? We can have it default to invoking the old one for compatibility reasons and it should be completely safe to tack on. Would also prefer the same for a ProcessorSupplier, but that's definitely outside the scope of this KIP On Fri, Apr 12, 2024 at 3:31 AM Nick Telford <nick.telf...@gmail.com> wrote: > On further thought, it's clear that this can't work for one simple reason: > StateStores don't know their associated TaskId (and hence, their > StateDirectory) until the init() call. Therefore, committedOffset() can't > be called before init(), unless we also added a StateStoreContext argument > to committedOffset(), which I think might be trying to shoehorn too much > into committedOffset(). > > I still don't like the idea of the Streams engine maintaining the cache of > changelog offsets independently of stores, mostly because of the > maintenance burden of the code duplication, but it looks like we'll have to > live with it. > > Unless you have any better ideas? > > Regards, > Nick > > On Wed, 10 Apr 2024 at 14:12, Nick Telford <nick.telf...@gmail.com> wrote: > > > Hi Bruno, > > > > Immediately after I sent my response, I looked at the codebase and came > to > > the same conclusion. If it's possible at all, it will need to be done by > > creating temporary StateManagers and StateStores during rebalance. I > think > > it is possible, and probably not too expensive, but the devil will be in > > the detail. > > > > I'll try to find some time to explore the idea to see if it's possible > and > > report back, because we'll need to determine this before we can vote on > the > > KIP. > > > > Regards, > > Nick > > > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna <cado...@apache.org> wrote: > > > >> Hi Nick, > >> > >> Thanks for reacting on my comments so quickly! > >> > >> > >> 2. > >> Some thoughts on your proposal. > >> State managers (and state stores) are parts of tasks. If the task is not > >> assigned locally, we do not create those tasks. To get the offsets with > >> your approach, we would need to either create kind of inactive tasks > >> besides active and standby tasks or store and manage state managers of > >> non-assigned tasks differently than the state managers of assigned > >> tasks. Additionally, the cleanup thread that removes unassigned task > >> directories needs to concurrently delete those inactive tasks or > >> task-less state managers of unassigned tasks. This seems all quite messy > >> to me. > >> Could we create those state managers (or state stores) for locally > >> existing but unassigned tasks on demand when > >> TaskManager#getTaskOffsetSums() is executed? Or have a different > >> encapsulation for the unused task directories? > >> > >> > >> Best, > >> Bruno > >> > >> > >> > >> On 4/10/24 11:31 AM, Nick Telford wrote: > >> > Hi Bruno, > >> > > >> > Thanks for the review! > >> > > >> > 1, 4, 5. > >> > Done > >> > > >> > 3. > >> > You're right. I've removed the offending paragraph. I had originally > >> > adapted this from the guarantees outlined in KIP-892. But it's > >> difficult to > >> > provide these guarantees without the KIP-892 transaction buffers. > >> Instead, > >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands. > >> > > >> > 2. > >> > Good point! This is the only part of the KIP that was (significantly) > >> > changed when I extracted it from KIP-892. My prototype currently > >> maintains > >> > this "cache" of changelog offsets in .checkpoint, but doing so becomes > >> very > >> > messy. My intent with this change was to try to better encapsulate > this > >> > offset "caching", especially for StateStores that can cheaply provide > >> the > >> > offsets stored directly in them without needing to duplicate them in > >> this > >> > cache. > >> > > >> > It's clear some more work is needed here to better encapsulate this. > My > >> > immediate thought is: what if we construct *but don't initialize* the > >> > StateManager and StateStores for every Task directory on-disk? That > >> should > >> > still be quite cheap to do, and would enable us to query the offsets > for > >> > all on-disk stores, even if they're not open. If the StateManager > (aka. > >> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold > >> open > >> > for closed stores, we could always have a "StubStateManager" in its > >> place, > >> > that enables the querying of offsets, but nothing else? > >> > > >> > IDK, what do you think? > >> > > >> > Regards, > >> > > >> > Nick > >> > > >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna <cado...@apache.org> > wrote: > >> > > >> >> Hi Nick, > >> >> > >> >> Thanks for breaking out the KIP from KIP-892! > >> >> > >> >> Here a couple of comments/questions: > >> >> > >> >> 1. > >> >> In Kafka Streams, we have a design guideline which says to not use > the > >> >> "get"-prefix for getters on the public API. Could you please change > >> >> getCommittedOffsets() to committedOffsets()? > >> >> > >> >> > >> >> 2. > >> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read > >> >> offsets of tasks the stream thread does not own but that have a state > >> >> directory on the Streams client by calling > >> >> StateStore#getCommittedOffsets(). If the thread does not own a task > it > >> >> does also not create any state stores for the task, which means there > >> is > >> >> no state store on which to call getCommittedOffsets(). > >> >> I would have rather expected that a checkpoint file is written for > all > >> >> state stores on close -- not only for the RocksDBStore -- and that > this > >> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the > >> tasks > >> >> that have a state directory on the client but are not currently > >> assigned > >> >> to any stream thread of the Streams client. > >> >> > >> >> > >> >> 3. > >> >> In the javadocs for commit() you write > >> >> > >> >> "... all writes since the last commit(Map), or since init(StateStore) > >> >> *MUST* be available to readers, even after a restart." > >> >> > >> >> This is only true for a clean close before the restart, isn't it? > >> >> If the task fails with a dirty close, Kafka Streams cannot guarantee > >> >> that the in-memory structures of the state store (e.g. memtable in > the > >> >> case of RocksDB) are flushed so that the records and the committed > >> >> offsets are persisted. > >> >> > >> >> > >> >> 4. > >> >> The wrapper that provides the legacy checkpointing behavior is > actually > >> >> an implementation detail. I would remove it from the KIP, but still > >> >> state that the legacy checkpointing behavior will be supported when > the > >> >> state store does not manage the checkpoints. > >> >> > >> >> > >> >> 5. > >> >> Regarding the metrics, could you please add the tags, and the > recording > >> >> level (DEBUG or INFO) as done in KIP-607 or KIP-444. > >> >> > >> >> > >> >> Best, > >> >> Bruno > >> >> > >> >> On 4/7/24 5:35 PM, Nick Telford wrote: > >> >>> Hi everyone, > >> >>> > >> >>> Based on some offline discussion, I've split out the "Atomic > >> >> Checkpointing" > >> >>> section from KIP-892: Transactional Semantics for StateStores, into > >> its > >> >> own > >> >>> KIP > >> >>> > >> >>> KIP-1035: StateStore managed changelog offsets > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > >> >>> > >> >>> While KIP-892 was adopted *with* the changes outlined in KIP-1035, > >> these > >> >>> changes were always the most contentious part, and continued to spur > >> >>> discussion even after KIP-892 was adopted. > >> >>> > >> >>> All the changes introduced in KIP-1035 have been removed from > KIP-892, > >> >> and > >> >>> a hard dependency on KIP-1035 has been added to KIP-892 in their > >> place. > >> >>> > >> >>> I'm hopeful that with some more focus on this set of changes, we can > >> >>> deliver something that we're all happy with. > >> >>> > >> >>> Regards, > >> >>> Nick > >> >>> > >> >> > >> > > >> > > >