That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> I don't think we need to *require* a constructor accept the TaskId, but we
> would definitely make sure that the RocksDB state store changes its
> constructor to one that accepts the TaskID (which we can do without
> deprecation since its an internal API), and custom state stores can just
> decide for themselves whether they want to opt-in/use the TaskId param
> or not. I mean custom state stores would have to opt-in anyways by
> implementing the new StoreSupplier#get(TaskId) API and the only
> reason to do that would be to have created a constructor that accepts
> a TaskId
>
> Just to be super clear about the proposal, this is what I had in mind.
> It's actually fairly simple and wouldn't add much to the scope of the
> KIP (I think -- if it turns out to be more complicated than I'm assuming,
> we should definitely do whatever has the smallest LOE to get this done
>
> Anyways, the (only) public API changes would be to add this new
> method to the StoreSupplier API:
>
> default T get(final TaskId taskId) {
>     return get();
> }
>
> We can decide whether or not to deprecate the old #get but it's not
> really necessary and might cause a lot of turmoil, so I'd personally
> say we just leave both APIs in place.
>
> And that's it for public API changes! Internally, we would just adapt
> each of the rocksdb StoreSupplier classes to implement this new
> API. So for example with the RocksDBKeyValueBytesStoreSupplier,
> we just add
>
> @Override
> public KeyValueStore<Bytes, byte[]> get(final TaskId taskId) {
>     return returnTimestampedStore ?
>         new RocksDBTimestampedStore(name, metricsScope(), taskId) :
>         new RocksDBStore(name, metricsScope(), taskId);
> }
>
> And of course add the TaskId parameter to each of the actual
> state store constructors returned here.
>
> Does that make sense? It's entirely possible I'm missing something
> important here, but I think this would be a pretty small addition that
> would solve the problem you mentioned earlier while also being
> useful to anyone who uses custom state stores.
>
> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi Sophie,
> >
> > Interesting idea! Although what would that mean for the StateStore
> > interface? Obviously we can't require that the constructor take the
> TaskId.
> > Is it enough to add the parameter to the StoreSupplier?
> >
> > Would doing this be in-scope for this KIP, or are we over-complicating
> it?
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman <sop...@responsive.dev
> >
> > wrote:
> >
> > > Somewhat minor point overall, but it actually drives me crazy that you
> > > can't get access to the taskId of a StateStore until #init is called.
> > This
> > > has caused me a huge headache personally (since the same is true for
> > > processors and I was trying to do something that's probably too hacky
> to
> > > actually complain about here lol)
> > >
> > > Can we just change the StateStoreSupplier to receive and pass along the
> > > taskId when creating a new store? Presumably by adding a new version of
> > the
> > > #get method that takes in a taskId parameter? We can have it default to
> > > invoking the old one for compatibility reasons and it should be
> > completely
> > > safe to tack on.
> > >
> > > Would also prefer the same for a ProcessorSupplier, but that's
> definitely
> > > outside the scope of this KIP
> > >
> > > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > >
> > > > On further thought, it's clear that this can't work for one simple
> > > reason:
> > > > StateStores don't know their associated TaskId (and hence, their
> > > > StateDirectory) until the init() call. Therefore, committedOffset()
> > can't
> > > > be called before init(), unless we also added a StateStoreContext
> > > argument
> > > > to committedOffset(), which I think might be trying to shoehorn too
> > much
> > > > into committedOffset().
> > > >
> > > > I still don't like the idea of the Streams engine maintaining the
> cache
> > > of
> > > > changelog offsets independently of stores, mostly because of the
> > > > maintenance burden of the code duplication, but it looks like we'll
> > have
> > > to
> > > > live with it.
> > > >
> > > > Unless you have any better ideas?
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Wed, 10 Apr 2024 at 14:12, Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Bruno,
> > > > >
> > > > > Immediately after I sent my response, I looked at the codebase and
> > came
> > > > to
> > > > > the same conclusion. If it's possible at all, it will need to be
> done
> > > by
> > > > > creating temporary StateManagers and StateStores during rebalance.
> I
> > > > think
> > > > > it is possible, and probably not too expensive, but the devil will
> be
> > > in
> > > > > the detail.
> > > > >
> > > > > I'll try to find some time to explore the idea to see if it's
> > possible
> > > > and
> > > > > report back, because we'll need to determine this before we can
> vote
> > on
> > > > the
> > > > > KIP.
> > > > >
> > > > > Regards,
> > > > > Nick
> > > > >
> > > > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna <cado...@apache.org>
> > > wrote:
> > > > >
> > > > >> Hi Nick,
> > > > >>
> > > > >> Thanks for reacting on my comments so quickly!
> > > > >>
> > > > >>
> > > > >> 2.
> > > > >> Some thoughts on your proposal.
> > > > >> State managers (and state stores) are parts of tasks. If the task
> is
> > > not
> > > > >> assigned locally, we do not create those tasks. To get the offsets
> > > with
> > > > >> your approach, we would need to either create kind of inactive
> tasks
> > > > >> besides active and standby tasks or store and manage state
> managers
> > of
> > > > >> non-assigned tasks differently than the state managers of assigned
> > > > >> tasks. Additionally, the cleanup thread that removes unassigned
> task
> > > > >> directories needs to concurrently delete those inactive tasks or
> > > > >> task-less state managers of unassigned tasks. This seems all quite
> > > messy
> > > > >> to me.
> > > > >> Could we create those state managers (or state stores) for locally
> > > > >> existing but unassigned tasks on demand when
> > > > >> TaskManager#getTaskOffsetSums() is executed? Or have a different
> > > > >> encapsulation for the unused task directories?
> > > > >>
> > > > >>
> > > > >> Best,
> > > > >> Bruno
> > > > >>
> > > > >>
> > > > >>
> > > > >> On 4/10/24 11:31 AM, Nick Telford wrote:
> > > > >> > Hi Bruno,
> > > > >> >
> > > > >> > Thanks for the review!
> > > > >> >
> > > > >> > 1, 4, 5.
> > > > >> > Done
> > > > >> >
> > > > >> > 3.
> > > > >> > You're right. I've removed the offending paragraph. I had
> > originally
> > > > >> > adapted this from the guarantees outlined in KIP-892. But it's
> > > > >> difficult to
> > > > >> > provide these guarantees without the KIP-892 transaction
> buffers.
> > > > >> Instead,
> > > > >> > we'll add the guarantees back into the JavaDoc when KIP-892
> lands.
> > > > >> >
> > > > >> > 2.
> > > > >> > Good point! This is the only part of the KIP that was
> > > (significantly)
> > > > >> > changed when I extracted it from KIP-892. My prototype currently
> > > > >> maintains
> > > > >> > this "cache" of changelog offsets in .checkpoint, but doing so
> > > becomes
> > > > >> very
> > > > >> > messy. My intent with this change was to try to better
> encapsulate
> > > > this
> > > > >> > offset "caching", especially for StateStores that can cheaply
> > > provide
> > > > >> the
> > > > >> > offsets stored directly in them without needing to duplicate
> them
> > in
> > > > >> this
> > > > >> > cache.
> > > > >> >
> > > > >> > It's clear some more work is needed here to better encapsulate
> > this.
> > > > My
> > > > >> > immediate thought is: what if we construct *but don't
> initialize*
> > > the
> > > > >> > StateManager and StateStores for every Task directory on-disk?
> > That
> > > > >> should
> > > > >> > still be quite cheap to do, and would enable us to query the
> > offsets
> > > > for
> > > > >> > all on-disk stores, even if they're not open. If the
> StateManager
> > > > (aka.
> > > > >> > ProcessorStateManager/GlobalStateManager) proves too expensive
> to
> > > hold
> > > > >> open
> > > > >> > for closed stores, we could always have a "StubStateManager" in
> > its
> > > > >> place,
> > > > >> > that enables the querying of offsets, but nothing else?
> > > > >> >
> > > > >> > IDK, what do you think?
> > > > >> >
> > > > >> > Regards,
> > > > >> >
> > > > >> > Nick
> > > > >> >
> > > > >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna <cado...@apache.org>
> > > > wrote:
> > > > >> >
> > > > >> >> Hi Nick,
> > > > >> >>
> > > > >> >> Thanks for breaking out the KIP from KIP-892!
> > > > >> >>
> > > > >> >> Here a couple of comments/questions:
> > > > >> >>
> > > > >> >> 1.
> > > > >> >> In Kafka Streams, we have a design guideline which says to not
> > use
> > > > the
> > > > >> >> "get"-prefix for getters on the public API. Could you please
> > change
> > > > >> >> getCommittedOffsets() to committedOffsets()?
> > > > >> >>
> > > > >> >>
> > > > >> >> 2.
> > > > >> >> It is not clear to me how TaskManager#getTaskOffsetSums()
> should
> > > read
> > > > >> >> offsets of tasks the stream thread does not own but that have a
> > > state
> > > > >> >> directory on the Streams client by calling
> > > > >> >> StateStore#getCommittedOffsets(). If the thread does not own a
> > task
> > > > it
> > > > >> >> does also not create any state stores for the task, which means
> > > there
> > > > >> is
> > > > >> >> no state store on which to call getCommittedOffsets().
> > > > >> >> I would have rather expected that a checkpoint file is written
> > for
> > > > all
> > > > >> >> state stores on close -- not only for the RocksDBStore -- and
> > that
> > > > this
> > > > >> >> checkpoint file is read in TaskManager#getTaskOffsetSums() for
> > the
> > > > >> tasks
> > > > >> >> that have a state directory on the client but are not currently
> > > > >> assigned
> > > > >> >> to any stream thread of the Streams client.
> > > > >> >>
> > > > >> >>
> > > > >> >> 3.
> > > > >> >> In the javadocs for commit() you write
> > > > >> >>
> > > > >> >> "... all writes since the last commit(Map), or since
> > > init(StateStore)
> > > > >> >> *MUST* be available to readers, even after a restart."
> > > > >> >>
> > > > >> >> This is only true for a clean close before the restart, isn't
> it?
> > > > >> >> If the task fails with a dirty close, Kafka Streams cannot
> > > guarantee
> > > > >> >> that the in-memory structures of the state store (e.g. memtable
> > in
> > > > the
> > > > >> >> case of RocksDB) are flushed so that the records and the
> > committed
> > > > >> >> offsets are persisted.
> > > > >> >>
> > > > >> >>
> > > > >> >> 4.
> > > > >> >> The wrapper that provides the legacy checkpointing behavior is
> > > > actually
> > > > >> >> an implementation detail. I would remove it from the KIP, but
> > still
> > > > >> >> state that the legacy checkpointing behavior will be supported
> > when
> > > > the
> > > > >> >> state store does not manage the checkpoints.
> > > > >> >>
> > > > >> >>
> > > > >> >> 5.
> > > > >> >> Regarding the metrics, could you please add the tags, and the
> > > > recording
> > > > >> >> level (DEBUG or INFO) as done in KIP-607 or KIP-444.
> > > > >> >>
> > > > >> >>
> > > > >> >> Best,
> > > > >> >> Bruno
> > > > >> >>
> > > > >> >> On 4/7/24 5:35 PM, Nick Telford wrote:
> > > > >> >>> Hi everyone,
> > > > >> >>>
> > > > >> >>> Based on some offline discussion, I've split out the "Atomic
> > > > >> >> Checkpointing"
> > > > >> >>> section from KIP-892: Transactional Semantics for StateStores,
> > > into
> > > > >> its
> > > > >> >> own
> > > > >> >>> KIP
> > > > >> >>>
> > > > >> >>> KIP-1035: StateStore managed changelog offsets
> > > > >> >>>
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> > > > >> >>>
> > > > >> >>> While KIP-892 was adopted *with* the changes outlined in
> > KIP-1035,
> > > > >> these
> > > > >> >>> changes were always the most contentious part, and continued
> to
> > > spur
> > > > >> >>> discussion even after KIP-892 was adopted.
> > > > >> >>>
> > > > >> >>> All the changes introduced in KIP-1035 have been removed from
> > > > KIP-892,
> > > > >> >> and
> > > > >> >>> a hard dependency on KIP-1035 has been added to KIP-892 in
> their
> > > > >> place.
> > > > >> >>>
> > > > >> >>> I'm hopeful that with some more focus on this set of changes,
> we
> > > can
> > > > >> >>> deliver something that we're all happy with.
> > > > >> >>>
> > > > >> >>> Regards,
> > > > >> >>> Nick
> > > > >> >>>
> > > > >> >>
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to