Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called. This
has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky to
actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along the
taskId when creating a new store? Presumably by adding a new version of the
#get method that takes in a taskId parameter? We can have it default to
invoking the old one for compatibility reasons and it should be completely
safe to tack on.

Would also prefer the same for a ProcessorSupplier, but that's definitely
outside the scope of this KIP

On Fri, Apr 12, 2024 at 3:31 AM Nick Telford <nick.telf...@gmail.com> wrote:

> On further thought, it's clear that this can't work for one simple reason:
> StateStores don't know their associated TaskId (and hence, their
> StateDirectory) until the init() call. Therefore, committedOffset() can't
> be called before init(), unless we also added a StateStoreContext argument
> to committedOffset(), which I think might be trying to shoehorn too much
> into committedOffset().
>
> I still don't like the idea of the Streams engine maintaining the cache of
> changelog offsets independently of stores, mostly because of the
> maintenance burden of the code duplication, but it looks like we'll have to
> live with it.
>
> Unless you have any better ideas?
>
> Regards,
> Nick
>
> On Wed, 10 Apr 2024 at 14:12, Nick Telford <nick.telf...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > Immediately after I sent my response, I looked at the codebase and came
> to
> > the same conclusion. If it's possible at all, it will need to be done by
> > creating temporary StateManagers and StateStores during rebalance. I
> think
> > it is possible, and probably not too expensive, but the devil will be in
> > the detail.
> >
> > I'll try to find some time to explore the idea to see if it's possible
> and
> > report back, because we'll need to determine this before we can vote on
> the
> > KIP.
> >
> > Regards,
> > Nick
> >
> > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna <cado...@apache.org> wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for reacting on my comments so quickly!
> >>
> >>
> >> 2.
> >> Some thoughts on your proposal.
> >> State managers (and state stores) are parts of tasks. If the task is not
> >> assigned locally, we do not create those tasks. To get the offsets with
> >> your approach, we would need to either create kind of inactive tasks
> >> besides active and standby tasks or store and manage state managers of
> >> non-assigned tasks differently than the state managers of assigned
> >> tasks. Additionally, the cleanup thread that removes unassigned task
> >> directories needs to concurrently delete those inactive tasks or
> >> task-less state managers of unassigned tasks. This seems all quite messy
> >> to me.
> >> Could we create those state managers (or state stores) for locally
> >> existing but unassigned tasks on demand when
> >> TaskManager#getTaskOffsetSums() is executed? Or have a different
> >> encapsulation for the unused task directories?
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 4/10/24 11:31 AM, Nick Telford wrote:
> >> > Hi Bruno,
> >> >
> >> > Thanks for the review!
> >> >
> >> > 1, 4, 5.
> >> > Done
> >> >
> >> > 3.
> >> > You're right. I've removed the offending paragraph. I had originally
> >> > adapted this from the guarantees outlined in KIP-892. But it's
> >> difficult to
> >> > provide these guarantees without the KIP-892 transaction buffers.
> >> Instead,
> >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands.
> >> >
> >> > 2.
> >> > Good point! This is the only part of the KIP that was (significantly)
> >> > changed when I extracted it from KIP-892. My prototype currently
> >> maintains
> >> > this "cache" of changelog offsets in .checkpoint, but doing so becomes
> >> very
> >> > messy. My intent with this change was to try to better encapsulate
> this
> >> > offset "caching", especially for StateStores that can cheaply provide
> >> the
> >> > offsets stored directly in them without needing to duplicate them in
> >> this
> >> > cache.
> >> >
> >> > It's clear some more work is needed here to better encapsulate this.
> My
> >> > immediate thought is: what if we construct *but don't initialize* the
> >> > StateManager and StateStores for every Task directory on-disk? That
> >> should
> >> > still be quite cheap to do, and would enable us to query the offsets
> for
> >> > all on-disk stores, even if they're not open. If the StateManager
> (aka.
> >> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold
> >> open
> >> > for closed stores, we could always have a "StubStateManager" in its
> >> place,
> >> > that enables the querying of offsets, but nothing else?
> >> >
> >> > IDK, what do you think?
> >> >
> >> > Regards,
> >> >
> >> > Nick
> >> >
> >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna <cado...@apache.org>
> wrote:
> >> >
> >> >> Hi Nick,
> >> >>
> >> >> Thanks for breaking out the KIP from KIP-892!
> >> >>
> >> >> Here a couple of comments/questions:
> >> >>
> >> >> 1.
> >> >> In Kafka Streams, we have a design guideline which says to not use
> the
> >> >> "get"-prefix for getters on the public API. Could you please change
> >> >> getCommittedOffsets() to committedOffsets()?
> >> >>
> >> >>
> >> >> 2.
> >> >> It is not clear to me how TaskManager#getTaskOffsetSums() should read
> >> >> offsets of tasks the stream thread does not own but that have a state
> >> >> directory on the Streams client by calling
> >> >> StateStore#getCommittedOffsets(). If the thread does not own a task
> it
> >> >> does also not create any state stores for the task, which means there
> >> is
> >> >> no state store on which to call getCommittedOffsets().
> >> >> I would have rather expected that a checkpoint file is written for
> all
> >> >> state stores on close -- not only for the RocksDBStore -- and that
> this
> >> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the
> >> tasks
> >> >> that have a state directory on the client but are not currently
> >> assigned
> >> >> to any stream thread of the Streams client.
> >> >>
> >> >>
> >> >> 3.
> >> >> In the javadocs for commit() you write
> >> >>
> >> >> "... all writes since the last commit(Map), or since init(StateStore)
> >> >> *MUST* be available to readers, even after a restart."
> >> >>
> >> >> This is only true for a clean close before the restart, isn't it?
> >> >> If the task fails with a dirty close, Kafka Streams cannot guarantee
> >> >> that the in-memory structures of the state store (e.g. memtable in
> the
> >> >> case of RocksDB) are flushed so that the records and the committed
> >> >> offsets are persisted.
> >> >>
> >> >>
> >> >> 4.
> >> >> The wrapper that provides the legacy checkpointing behavior is
> actually
> >> >> an implementation detail. I would remove it from the KIP, but still
> >> >> state that the legacy checkpointing behavior will be supported when
> the
> >> >> state store does not manage the checkpoints.
> >> >>
> >> >>
> >> >> 5.
> >> >> Regarding the metrics, could you please add the tags, and the
> recording
> >> >> level (DEBUG or INFO) as done in KIP-607 or KIP-444.
> >> >>
> >> >>
> >> >> Best,
> >> >> Bruno
> >> >>
> >> >> On 4/7/24 5:35 PM, Nick Telford wrote:
> >> >>> Hi everyone,
> >> >>>
> >> >>> Based on some offline discussion, I've split out the "Atomic
> >> >> Checkpointing"
> >> >>> section from KIP-892: Transactional Semantics for StateStores, into
> >> its
> >> >> own
> >> >>> KIP
> >> >>>
> >> >>> KIP-1035: StateStore managed changelog offsets
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> >> >>>
> >> >>> While KIP-892 was adopted *with* the changes outlined in KIP-1035,
> >> these
> >> >>> changes were always the most contentious part, and continued to spur
> >> >>> discussion even after KIP-892 was adopted.
> >> >>>
> >> >>> All the changes introduced in KIP-1035 have been removed from
> KIP-892,
> >> >> and
> >> >>> a hard dependency on KIP-1035 has been added to KIP-892 in their
> >> place.
> >> >>>
> >> >>> I'm hopeful that with some more focus on this set of changes, we can
> >> >>> deliver something that we're all happy with.
> >> >>>
> >> >>> Regards,
> >> >>> Nick
> >> >>>
> >> >>
> >> >
> >>
> >
>

Reply via email to