Hi everyone,

Sorry I haven't got around to updating the KIP yet. Now that I've wrapped
up KIP-989, I'm going to be working on 1035 starting today.

I'll update the KIP first, and then call a vote.

Regards,
Nick

On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org> wrote:

> Totally agree on moving forward and starting the VOTE!
>
> However, the KIP should be updated with the new info before starting the
> VOTE.
>
> Best,
> Bruno
>
> On 5/29/24 2:36 AM, Matthias J. Sax wrote:
> > Sounds like a good plan. -- I think we are still wrapping up 3.8
> > release, but would also like to move forward with with one.
> >
> > Should we start a VOTE?
> >
> > For merging PRs we need to wait after code freeze, and 3.8 branch was
> > but. But we could start reviewing PRs before this already.
> >
> >
> > -Matthias
> >
> > On 5/17/24 3:05 AM, Nick Telford wrote:
> >> Hi everyone,
> >>
> >> As discussed on the Zoom call, we're going to handle rebalance
> >> meta-data by:
> >>
> >> - On start-up, Streams will open each store and read its changelog
> >> offsets
> >> into an in-memory cache. This cache will be shared among all
> >> StreamThreads.
> >> - On rebalance, the cache will be consulted for Task offsets for any
> Task
> >> that is not active on any instance-local StreamThreads. If the Task is
> >> active on *any* instance-local StreamThread, we will report the Task
> >> lag as
> >> "up to date" (i.e. -1), because we know that the local state is
> currently
> >> up-to-date.
> >>
> >> We will avoid caching offsets across restarts in the legacy
> ".checkpoint"
> >> file, so that we can eliminate the logic for handling this class. If
> >> performance of opening/closing many state stores is poor, we can
> >> parallelise it by forking off a thread for each Task directory when
> >> reading
> >> the offsets.
> >>
> >> I'll update the KIP later today to reflect this design, but I will try
> to
> >> keep it high-level, so that the exact implementation can vary.
> >>
> >> Regards,
> >>
> >> Nick
> >>
> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <
> sop...@responsive.dev>
> >> wrote:
> >>
> >>> 103: I like the idea of immediately deprecating #managesOffsets and
> >>> aiming
> >>> to make offset management mandatory in the long run. I assume we
> >>> would also
> >>> log a warning for any custom stores that return "false" from this
> >>> method to
> >>> encourage custom store implementations to start doing so? My only
> >>> question/concern is that if we want folks to start managing their own
> >>> offsets then we should make this transition easy for them, perhaps by
> >>> exposing some public utility APIs for things that are currently
> >>> handled by
> >>> Kafka Streams such as reading/writing checkpoint files. Maybe it
> >>> would be
> >>> useful to include a small example in the KIP of what it would
> >>> actually mean
> >>> to "manage your own offsets" -- I know (all too well) that plugging in
> >>> custom storage implementations is not easy and most people who do
> >>> this are
> >>> probably fairly advanced users, but offset management will be a
> >>> totally new
> >>> ballgame to most people people and this kind of feels like throwing
> them
> >>> off the deep end. We should at least provide a lifejacket via some
> >>> kind of
> >>> utility API and/or example
> >>>
> >>> 200. There's been a lot of back and forth on the rebalance
> metadata/task
> >>> lag computation question, so forgive me if I missed any part of this,
> >>> but I
> >>> think we've landed at the right idea here. To summarize: the "tl;dr"
> >>> explanation is that we'll write the checkpoint file only on close and
> >>> will
> >>> account for hard-crash scenarios by opening up the stores on startup
> and
> >>> writing a checkpoint file for any missing tasks. Does that sound about
> >>> right?
> >>>
> >>> A few clarifications:
> >>> I think we're all more or less on the same page here but just to be
> >>> absolutely clear, the task lags for each task directory found on disk
> >>> will
> >>> be reported by only one of the StreamThreads, and each StreamThread
> will
> >>> report lags only for tasks that it already owns or are not assigned
> >>> to any
> >>> other StreamThread in the client. In other words, we only need to get
> >>> the
> >>> task lag for completely unassigned/unlocked tasks, which means if
> >>> there is
> >>> a checkpoint file at all then it must be up-to-date, because there is
> no
> >>> other StreamThread actively writing to that state store (if so then
> only
> >>> that StreamThread would report lag for that particular task).
> >>>
> >>> This still leaves the "no checkpoint at all" case which as previously
> >>> mentioned can occur after a  hard-crash. Luckily we only have to worry
> >>> about this once, after starting up again following said hard crash.
> >>> We can
> >>> simply open up each of the state stores before ever joining the
> >>> group, get
> >>> the offsets from rocksdb, and write them to a new checkpoint file.
> After
> >>> that, we can depend on the checkpoints written at close and won't
> >>> have to
> >>> open up any stores that aren't already assigned for the reasons laid
> >>> out in
> >>> the paragraph above.
> >>>
> >>> As for the specific mechanism and which thread-does-what, since there
> >>> were
> >>> some questions, this is how I'm imagining the process:
> >>>
> >>>     1.   The general idea is that we simply go through each task
> >>> directories
> >>>     with state but no checkpoint file and open the StateStore, call
> >>>     #committedOffset, and then write it to the checkpoint file. We
> >>> can then
> >>>     close these stores and let things proceed as normal.
> >>>     2.  This only has to happen once, during startup, but we have two
> >>>     options:
> >>>        1. Do this from KafkaStreams#start, ie before we even create the
> >>>        StreamThreads
> >>>        2.  Do this from StreamThread#start, following a similar
> >>> lock-based
> >>>        approach to the one used #computeTaskLags, where each
> >>> StreamThread
> >>> just
> >>>        makes a pass over the task directories on disk and attempts to
> >>> lock
> >>> them
> >>>        one by one. If they obtain the lock, check whether there is
> state
> >>> but no
> >>>        checkpoint, and write the checkpoint if needed. If it can't grab
> >>> the lock,
> >>>        then we know one of the other StreamThreads must be handling the
> >>> checkpoint
> >>>        file for that task directory, and we can move on.
> >>>
> >>> Don't really feel too strongly about which approach is best,  doing
> >>> it in
> >>> KafkaStreams#start is certainly the most simple while doing it in the
> >>> StreamThread's startup is more efficient. If we're worried about
> >>> adding too
> >>> much weight to KafkaStreams#start then the 2nd option is probably best,
> >>> though slightly more complicated.
> >>>
> >>> Thoughts?
> >>>
> >>> On Tue, May 14, 2024 at 10:02 AM Nick Telford <nick.telf...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> Sorry for the delay in replying. I've finally now got some time to
> work
> >>> on
> >>>> this.
> >>>>
> >>>> Addressing Matthias's comments:
> >>>>
> >>>> 100.
> >>>> Good point. As Bruno mentioned, there's already
> >>> AbstractReadWriteDecorator
> >>>> which we could leverage to provide that protection. I'll add details
> on
> >>>> this to the KIP.
> >>>>
> >>>> 101,102.
> >>>> It looks like these points have already been addressed by Bruno. Let
> me
> >>>> know if anything here is still unclear or you feel needs to be
> detailed
> >>>> more in the KIP.
> >>>>
> >>>> 103.
> >>>> I'm in favour of anything that gets the old code removed sooner, but
> >>>> wouldn't deprecating an API that we expect (some) users to implement
> >>> cause
> >>>> problems?
> >>>> I'm thinking about implementers of custom StateStores, as they may be
> >>>> confused by managesOffsets() being deprecated, especially since they
> >>> would
> >>>> have to mark their implementation as @Deprecated in order to avoid
> >>> compile
> >>>> warnings.
> >>>> If deprecating an API *while it's still expected to be implemented* is
> >>>> something that's generally done in the project, then I'm happy to do
> so
> >>>> here.
> >>>>
> >>>> 104.
> >>>> I think this is technically possible, but at the cost of considerable
> >>>> additional code to maintain. Would we ever have a pathway to remove
> >>>> this
> >>>> downgrade code in the future?
> >>>>
> >>>>
> >>>> Regarding rebalance metadata:
> >>>> Opening all stores on start-up to read and cache their offsets is an
> >>>> interesting idea, especially if we can avoid re-opening the stores
> once
> >>> the
> >>>> Tasks have been assigned. Scalability shouldn't be too much of a
> >>>> problem,
> >>>> because typically users have a fairly short state.cleanup.delay, so
> the
> >>>> number of on-disk Task directories should rarely exceed the number of
> >>> Tasks
> >>>> previously assigned to that instance.
> >>>> An advantage of this approach is that it would also simplify
> StateStore
> >>>> implementations, as they would only need to guarantee that committed
> >>>> offsets are available when the store is open.
> >>>>
> >>>> I'll investigate this approach this week for feasibility and report
> >>>> back.
> >>>>
> >>>> I think that covers all the outstanding feedback, unless I missed
> >>> anything?
> >>>>
> >>>> Regards,
> >>>> Nick
> >>>>
> >>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org>
> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> I see what you mean.
> >>>>>
> >>>>> To sum up:
> >>>>>
> >>>>> With this KIP the .checkpoint file is written when the store closes.
> >>>>> That is when:
> >>>>> 1. a task moves away from Kafka Streams client
> >>>>> 2. Kafka Streams client shuts down
> >>>>>
> >>>>> A Kafka Streams client needs the information in the .checkpoint file
> >>>>> 1. on startup because it does not have any open stores yet.
> >>>>> 2. during rebalances for non-empty state directories of tasks that
> are
> >>>>> not assigned to the Kafka Streams client.
> >>>>>
> >>>>> With hard crashes, i.e., when the Streams client is not able to close
> >>>>> its state stores and write the .checkpoint file, the .checkpoint file
> >>>>> might be quite stale. That influences the next rebalance after
> >>>>> failover
> >>>>> negatively.
> >>>>>
> >>>>>
> >>>>> My conclusion is that Kafka Streams either needs to open the state
> >>>>> stores at start up or we write the checkpoint file more often.
> >>>>>
> >>>>> Writing the .checkpoint file during processing more often without
> >>>>> controlling the flush to disk would work. However, Kafka Streams
> would
> >>>>> checkpoint offsets that are not yet persisted on disk by the state
> >>>>> store. That is with a hard crash the offsets in the .checkpoint file
> >>>>> might be larger than the offsets checkpointed in the state store.
> That
> >>>>> might not be a problem if Kafka Streams uses the .checkpoint file
> only
> >>>>> to compute the task lag. The downside is that it makes the managing
> of
> >>>>> checkpoints more complex because now we have to maintain two
> >>>>> checkpoints: one for restoration and one for computing the task lag.
> >>>>> I think we should explore the option where Kafka Streams opens the
> >>> state
> >>>>> stores at start up to get the offsets.
> >>>>>
> >>>>> I also checked when Kafka Streams needs the checkpointed offsets to
> >>>>> compute the task lag during a rebalance. Turns out Kafka Streams
> needs
> >>>>> them before sending the join request. Now, I am wondering if opening
> >>> the
> >>>>> state stores of unassigned tasks whose state directory exists locally
> >>> is
> >>>>> actually such a big issue due to the expected higher latency since it
> >>>>> happens actually before the Kafka Streams client joins the rebalance.
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote:
> >>>>>> That's good questions... I could think of a few approaches, but I
> >>> admit
> >>>>>> it might all be a little bit tricky to code up...
> >>>>>>
> >>>>>> However if we don't solve this problem, I think this KIP does not
> >>>> really
> >>>>>> solve the core issue we are facing? In the end, if we rely on the
> >>>>>> `.checkpoint` file to compute a task assignment, but the
> >>> `.checkpoint`
> >>>>>> file can be arbitrary stale after a crash because we only write it
> >>> on a
> >>>>>> clean close, there would be still a huge gap that this KIP does not
> >>>>> close?
> >>>>>>
> >>>>>> For the case in which we keep the checkpoint file, this KIP would
> >>> still
> >>>>>> help for "soft errors" in which KS can recover, and roll back the
> >>>> store.
> >>>>>> A significant win for sure. -- But hard crashes would still be an
> >>>>>> problem? We might assign tasks to "wrong" instance, ie, which are
> not
> >>>>>> most up to date, as the checkpoint information could be very
> >>> outdated?
> >>>>>> Would we end up with a half-baked solution? Would this be good
> enough
> >>>> to
> >>>>>> justify the introduced complexity? In the, for soft failures it's
> >>> still
> >>>>>> a win. Just want to make sure we understand the limitations and make
> >>> an
> >>>>>> educated decision.
> >>>>>>
> >>>>>> Or do I miss something?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>>
> >>>>>>> 200:
> >>>>>>> I like the idea in general. However, it is not clear to me how the
> >>>>>>> behavior should be with multiple stream threads in the same Kafka
> >>>>>>> Streams client. What stream thread opens which store? How can a
> >>> stream
> >>>>>>> thread pass an open store to another stream thread that got the
> >>>>>>> corresponding task assigned? How does a stream thread know that a
> >>> task
> >>>>>>> was not assigned to any of the stream threads of the Kafka Streams
> >>>>>>> client? I have the feeling we should just keep the .checkpoint file
> >>> on
> >>>>>>> close for now to unblock this KIP and try to find a solution to get
> >>>>>>> totally rid of it later.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote:
> >>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush the
> >>>>>>>> .position file to disk periodically, but only maintain it in main
> >>>>>>>> memory, and only write it to disk on close() to preserve it across
> >>>>>>>> restarts. This way, it would never be ahead, but might only lag?
> >>> But
> >>>>>>>> with my better understanding about (102) it might be mood
> anyway...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes sense.
> >>>>>>>> Might be something to be worth calling out explicitly in the KIP
> >>>>>>>> writeup. -- Now that I realize that the position is tracked inside
> >>>>>>>> the store (not outside as the changelog offsets) it makes much
> more
> >>>>>>>> sense to pull position into RocksDB itself. In the end, it's
> >>> actually
> >>>>>>>> a "store implementation" detail how it tracks the position (and
> >>> kinda
> >>>>>>>> leaky abstraction currently, that we re-use the checkpoint file
> >>>>>>>> mechanism to track it and flush to disk).
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 200: I was thinking about this a little bit more, and maybe it's
> >>> not
> >>>>>>>> too bad? When KS starts up, we could upon all stores we find on
> >>> local
> >>>>>>>> disk pro-actively, and keep them all open until the first
> rebalance
> >>>>>>>> finishes: For tasks we get assigned, we hand in the already opened
> >>>>>>>> store (this would amortize the cost to open the store before the
> >>>>>>>> rebalance) and for non-assigned tasks, we know the offset
> >>> information
> >>>>>>>> won't change and we could just cache it in-memory for later reuse
> >>>>>>>> (ie, next rebalance) and close the store to free up resources? --
> >>>>>>>> Assuming that we would get a large percentage of opened stores
> >>>>>>>> assigned as tasks anyway, this could work?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 101:
> >>>>>>>>> Let's assume a RocksDB store, but I think the following might be
> >>>>>>>>> true also for other store implementations. With this KIP, if
> Kafka
> >>>>>>>>> Streams commits the offsets, the committed offsets will be stored
> >>> in
> >>>>>>>>> an in-memory data structure (i.e. the memtable) and stay there
> >>> until
> >>>>>>>>> RocksDB decides that it is time to persist its in-memory data
> >>>>>>>>> structure. If Kafka Streams writes its position to the .position
> >>>>>>>>> file during a commit and a crash happens before RocksDB persist
> >>> the
> >>>>>>>>> memtable then the position in the .position file is ahead of the
> >>>>>>>>> persisted offset. If IQ is done between the crash and the state
> >>>>>>>>> store fully restored the changelog, the position might tell IQ
> >>> that
> >>>>>>>>> the state store is more up-to-date than it actually is.
> >>>>>>>>> In contrast, if Kafka Streams handles persisting positions the
> >>> same
> >>>>>>>>> as persisting offset, the position should always be consistent
> >>> with
> >>>>>>>>> the offset, because they are persisted together.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 102:
> >>>>>>>>> I am confused about your confusion which tells me that we are
> >>>>>>>>> talking about two different things.
> >>>>>>>>> You asked
> >>>>>>>>>
> >>>>>>>>> "Do you intent to add this information [i.e. position] to the map
> >>>>>>>>> passed via commit(final Map<TopicPartition, Long>
> >>>> changelogOffsets)?"
> >>>>>>>>>
> >>>>>>>>> and with what I wrote I meant that we do not need to pass the
> >>>>>>>>> position into the implementation of the StateStore interface
> since
> >>>>>>>>> the position is updated within the implementation of the
> >>> StateStore
> >>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the
> >>>>>>>>> behavior now, not the change proposed in this KIP, so it does not
> >>>>>>>>> contradict what is stated in the KIP.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 200:
> >>>>>>>>> This is about Matthias' main concern about rebalance metadata.
> >>>>>>>>> As far as I understand the KIP, Kafka Streams will only use the
> >>>>>>>>> .checkpoint files to compute the task lag for unassigned tasks
> >>> whose
> >>>>>>>>> state is locally available. For assigned tasks, it will use the
> >>>>>>>>> offsets managed by the open state store.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>
> >>>>
> >>>
> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
> >>>>>>>>>
> >>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote:
> >>>>>>>>>> Thanks Bruno.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 101: I think I understand this better now. But just want to make
> >>>>>>>>>> sure I do. What do you mean by "they can diverge" and
> "Recovering
> >>>>>>>>>> after a failure might load inconsistent offsets and positions."
> >>>>>>>>>>
> >>>>>>>>>> The checkpoint is the offset from the changelog, while the
> >>> position
> >>>>>>>>>> is the offset from the upstream source topic, right? -- In the
> >>> end,
> >>>>>>>>>> the position is about IQ, and if we fail to update it, it only
> >>>>>>>>>> means that there is some gap when we might not be able to query
> a
> >>>>>>>>>> standby task, because we think it's not up-to-date enough even
> if
> >>>>>>>>>> it is, which would resolve itself soon? Ie, the position might
> >>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this lag
> >>>>>>>>>> would be highly problematic?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 102: I am confused.
> >>>>>>>>>>
> >>>>>>>>>>> The position is maintained inside the state store, but is
> >>>>>>>>>>> persisted in the .position file when the state store closes.
> >>>>>>>>>>
> >>>>>>>>>> This contradicts the KIP:
> >>>>>>>>>>
> >>>>>>>>>>>   these position offsets will be stored in RocksDB, in the same
> >>>>>>>>>>> column family as the changelog offsets, instead of the
> .position
> >>>>> file
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> My main concern is currently about rebalance metadata -- opening
> >>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow the
> >>>> KIP:
> >>>>>>>>>>
> >>>>>>>>>>> We will do this under EOS by updating the .checkpoint file
> >>>>>>>>>>> whenever a store is close()d.
> >>>>>>>>>>
> >>>>>>>>>> It seems, having the offset inside RocksDB does not help us at
> >>> all?
> >>>>>>>>>> In the end, when we crash, we don't want to lose the state, but
> >>>>>>>>>> when we update the .checkpoint only on a clean close, the
> >>>>>>>>>> .checkpoint might be stale (ie, still contains the checkpoint
> >>> when
> >>>>>>>>>> we opened the store when we got a task assigned).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote:
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> 100
> >>>>>>>>>>> I think we already have such a wrapper. It is called
> >>>>>>>>>>> AbstractReadWriteDecorator.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 101
> >>>>>>>>>>> Currently, the position is checkpointed when a offset
> checkpoint
> >>>>>>>>>>> is written. If we let the state store manage the committed
> >>>>>>>>>>> offsets, we need to also let the state store also manage the
> >>>>>>>>>>> position otherwise they might diverge. State store managed
> >>> offsets
> >>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the state
> >>>>>>>>>>> store decides to flush its in-memory data structures, but the
> >>>>>>>>>>> position is only checkpointed at commit time. Recovering after
> a
> >>>>>>>>>>> failure might load inconsistent offsets and positions.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 102
> >>>>>>>>>>> The position is maintained inside the state store, but is
> >>>>>>>>>>> persisted in the .position file when the state store closes.
> The
> >>>>>>>>>>> only public interface that uses the position is IQv2 in a
> >>>>>>>>>>> read-only mode. So the position is only updated within the
> state
> >>>>>>>>>>> store and read from IQv2. No need to add anything to the public
> >>>>>>>>>>> StateStore interface.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 103
> >>>>>>>>>>> Deprecating managesOffsets() right away might be a good idea.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 104
> >>>>>>>>>>> I agree that we should try to support downgrades without wipes.
> >>> At
> >>>>>>>>>>> least Nick should state in the KIP why we do not support it.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote:
> >>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows, that
> >>> it
> >>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its own.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Couple of question / comment:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be
> called
> >>>>>>>>>>>> by users" -- I would propose to put a guard in place for this,
> >>> by
> >>>>>>>>>>>> either throwing an exception (preferable) or adding a no-op
> >>>>>>>>>>>> implementation (at least for our own stores, by wrapping them
> >>> --
> >>>>>>>>>>>> we cannot enforce it for custom stores I assume), and document
> >>>>>>>>>>>> this contract explicitly.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually need
> >>>>>>>>>>>> this? The KIP says "To ensure consistency with the committed
> >>> data
> >>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow? Can
> >>> you
> >>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't work?
> >>>>>>>>>>>>
> >>>>>>>>>>>>> If it's possible at all, it will need to be done by
> >>>>>>>>>>>>> creating temporary StateManagers and StateStores during
> >>>>>>>>>>>>> rebalance. I think
> >>>>>>>>>>>>> it is possible, and probably not too expensive, but the devil
> >>>>>>>>>>>>> will be in
> >>>>>>>>>>>>> the detail.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This sounds like a significant overhead to me. We know that
> >>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus opening
> >>>>>>>>>>>> RocksDB to get this information might slow down rebalances
> >>>>>>>>>>>> significantly.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 102: It's unclear to me, how `.position` information is added.
> >>>>>>>>>>>> The KIP only says: "position offsets will be stored in
> RocksDB,
> >>>>>>>>>>>> in the same column family as the changelog offsets". Do you
> >>>>>>>>>>>> intent to add this information to the map passed via
> >>>>>>>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`?
> The
> >>>>>>>>>>>> KIP should describe this in more detail. Also, if my
> assumption
> >>>>>>>>>>>> is correct, we might want to rename the parameter and also
> >>> have a
> >>>>>>>>>>>> better JavaDoc description?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all stores
> >>>>>>>>>>>> (including custom stores) manage their offsets internally?
> >>>>>>>>>>>> Maintaining both options and thus both code paths puts a
> burden
> >>>>>>>>>>>> on everyone and make the code messy. I would strongly prefer
> if
> >>>>>>>>>>>> we could have mid-term path to get rid of supporting both.  --
> >>>>>>>>>>>> For this case, we should deprecate the newly added
> >>>>>>>>>>>> `managesOffsets()` method right away, to point out that we
> >>> intend
> >>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for
> stores,
> >>>>>>>>>>>> we won't need this method any longer. In memory stores can
> just
> >>>>>>>>>>>> return null from #committedOffset().
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 104 "downgrading": I think it might be worth to add support
> for
> >>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging
> >>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling bounce
> >>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from` set
> >>> to
> >>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on
> >>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into
> >>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL is
> >>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would be
> >>> able
> >>>>>>>>>>>> to open RocksDB. -- I understand that this implies much more
> >>>>>>>>>>>> work, but downgrade seems to be common enough, that it might
> be
> >>>>>>>>>>>> worth it? Even if we did not always support this in the past,
> >>> we
> >>>>>>>>>>>> have the face the fact that KS is getting more and more
> adopted
> >>>>>>>>>>>> and as a more mature product should support this?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> How should we proceed here?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. with the plain .checkpoint file
> >>>>>>>>>>>>> 2. with a way to use the state store interface on unassigned
> >>> but
> >>>>>>>>>>>>> locally existing task state
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and
> will
> >>>>>>>>>>>>> give us the benefits of transactional state stores sooner. We
> >>>>>>>>>>>>> should consider the interface approach afterwards, though.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote:
> >>>>>>>>>>>>>> Hi Nick and Sophie,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think the task ID is not enough to create a state store
> >>> that
> >>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag
> >>> computation
> >>>>>>>>>>>>>> during rebalancing. The state store also needs the state
> >>>>>>>>>>>>>> directory so that it knows where to find the information
> that
> >>>>>>>>>>>>>> it needs to return from changelogOffsets().
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In general, I think we should proceed with the plain
> >>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state store
> >>>>>>>>>>>>>> solution later since it seems it is not that
> straightforward.
> >>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better
> >>>>>>>>>>>>>> understand what would be needed for the state store
> solution.
> >>>>>>>>>>>>>> Nick, let us know your decision.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regarding your question about the state store instance. I am
> >>>>>>>>>>>>>> not too familiar with that part of the code, but I think the
> >>>>>>>>>>>>>> state store is build when the processor topology is build
> and
> >>>>>>>>>>>>>> the processor topology is build per stream task. So there is
> >>>>>>>>>>>>>> one instance of processor topology and state store per
> stream
> >>>>>>>>>>>>>> task. Try to follow the call in [1].
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>
> >>>>>
> >>>>
> >>>
> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out is
> >>> how
> >>>>>>>>>>>>>>> per-Task
> >>>>>>>>>>>>>>> StateStore instances are constructed.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It looks like we construct one StateStore instance for the
> >>>>>>>>>>>>>>> whole Topology
> >>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into
> >>>>>>>>>>>>>>> ProcessorStateManager (via
> >>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This can't be the case though, otherwise multiple
> partitions
> >>>>>>>>>>>>>>> of the same
> >>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore
> >>>>>>>>>>>>>>> instance, which
> >>>>>>>>>>>>>>> they don't.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What am I missing?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
> >>>>>>>>>>>>>>> <sop...@responsive.dev>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I don't think we need to *require* a constructor accept
> the
> >>>>>>>>>>>>>>>> TaskId, but we
> >>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state store
> >>>>>>>>>>>>>>>> changes its
> >>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we can
> do
> >>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>> deprecation since its an internal API), and custom state
> >>>>>>>>>>>>>>>> stores can just
> >>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use the
> >>>>>>>>>>>>>>>> TaskId param
> >>>>>>>>>>>>>>>> or not. I mean custom state stores would have to opt-in
> >>>>>>>>>>>>>>>> anyways by
> >>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and the
> >>>> only
> >>>>>>>>>>>>>>>> reason to do that would be to have created a constructor
> >>> that
> >>>>>>>>>>>>>>>> accepts
> >>>>>>>>>>>>>>>> a TaskId
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is what I
> >>> had
> >>>>>>>>>>>>>>>> in mind.
> >>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to the
> >>>>>>>>>>>>>>>> scope of the
> >>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated
> than
> >>>>>>>>>>>>>>>> I'm assuming,
> >>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE to
> >>> get
> >>>>>>>>>>>>>>>> this done
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add
> this
> >>>> new
> >>>>>>>>>>>>>>>> method to the StoreSupplier API:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> default T get(final TaskId taskId) {
> >>>>>>>>>>>>>>>>       return get();
> >>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get but
> >>>>>>>>>>>>>>>> it's not
> >>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so I'd
> >>>>>>>>>>>>>>>> personally
> >>>>>>>>>>>>>>>> say we just leave both APIs in place.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we would
> >>>>>>>>>>>>>>>> just adapt
> >>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement
> this
> >>>> new
> >>>>>>>>>>>>>>>> API. So for example with the
> >>>> RocksDBKeyValueBytesStoreSupplier,
> >>>>>>>>>>>>>>>> we just add
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @Override
> >>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId
> >>> taskId)
> >>>> {
> >>>>>>>>>>>>>>>>       return returnTimestampedStore ?
> >>>>>>>>>>>>>>>>           new RocksDBTimestampedStore(name,
> metricsScope(),
> >>>>>>>>>>>>>>>> taskId) :
> >>>>>>>>>>>>>>>>           new RocksDBStore(name, metricsScope(), taskId);
> >>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the
> >>> actual
> >>>>>>>>>>>>>>>> state store constructors returned here.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm missing
> >>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>> important here, but I think this would be a pretty small
> >>>>>>>>>>>>>>>> addition that
> >>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while also
> >>>> being
> >>>>>>>>>>>>>>>> useful to anyone who uses custom state stores.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
> >>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Sophie,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for the
> >>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>> interface? Obviously we can't require that the
> constructor
> >>>>>>>>>>>>>>>>> take the
> >>>>>>>>>>>>>>>> TaskId.
> >>>>>>>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we
> >>>>>>>>>>>>>>>>> over-complicating
> >>>>>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
> >>>>>>>>>>>>>>>>> <sop...@responsive.dev
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives me
> >>>>>>>>>>>>>>>>>> crazy that you
> >>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until
> >>> #init
> >>>>>>>>>>>>>>>>>> is called.
> >>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the same
> >>> is
> >>>>>>>>>>>>>>>>>> true for
> >>>>>>>>>>>>>>>>>> processors and I was trying to do something that's
> >>> probably
> >>>>>>>>>>>>>>>>>> too hacky
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> actually complain about here lol)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive and
> >>>>>>>>>>>>>>>>>> pass along the
> >>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by adding a
> >>>>>>>>>>>>>>>>>> new version of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can
> have
> >>>>>>>>>>>>>>>>>> it default to
> >>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and it
> >>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>> completely
> >>>>>>>>>>>>>>>>>> safe to tack on.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, but
> >>>>> that's
> >>>>>>>>>>>>>>>> definitely
> >>>>>>>>>>>>>>>>>> outside the scope of this KIP
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
> >>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work for
> >>>>>>>>>>>>>>>>>>> one simple
> >>>>>>>>>>>>>>>>>> reason:
> >>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and
> >>> hence,
> >>>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore,
> >>>>>>>>>>>>>>>>>>> committedOffset()
> >>>>>>>>>>>>>>>>> can't
> >>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a
> >>>>>>>>>>>>>>>>>>> StateStoreContext
> >>>>>>>>>>>>>>>>>> argument
> >>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying to
> >>>>>>>>>>>>>>>>>>> shoehorn too
> >>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>> into committedOffset().
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine
> >>>>>>>>>>>>>>>>>>> maintaining the
> >>>>>>>>>>>>>>>> cache
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly
> >>> because
> >>>>>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it
> looks
> >>>>>>>>>>>>>>>>>>> like we'll
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> live with it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Unless you have any better ideas?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford
> >>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at the
> >>>>>>>>>>>>>>>>>>>> codebase and
> >>>>>>>>>>>>>>>>> came
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it will
> >>>>>>>>>>>>>>>>>>>> need to be
> >>>>>>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores
> during
> >>>>>>>>>>>>>>>>>>>> rebalance.
> >>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but
> the
> >>>>>>>>>>>>>>>>>>>> devil will
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> the detail.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to see
> >>> if
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this
> >>> before
> >>>>>>>>>>>>>>>>>>>> we can
> >>>>>>>>>>>>>>>> vote
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
> >>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal.
> >>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of tasks.
> >>> If
> >>>>>>>>>>>>>>>>>>>>> the task
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To
> get
> >>>>>>>>>>>>>>>>>>>>> the offsets
> >>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create kind of
> >>>>>>>>>>>>>>>>>>>>> inactive
> >>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and manage
> >>>> state
> >>>>>>>>>>>>>>>> managers
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state
> managers
> >>>>>>>>>>>>>>>>>>>>> of assigned
> >>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that removes
> >>>>>>>>>>>>>>>>>>>>> unassigned
> >>>>>>>>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those
> >>> inactive
> >>>>>>>>>>>>>>>>>>>>> tasks or
> >>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This
> >>> seems
> >>>>>>>>>>>>>>>>>>>>> all quite
> >>>>>>>>>>>>>>>>>> messy
> >>>>>>>>>>>>>>>>>>>>> to me.
> >>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state
> stores)
> >>>>>>>>>>>>>>>>>>>>> for locally
> >>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when
> >>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or have
> a
> >>>>>>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the review!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1, 4, 5.
> >>>>>>>>>>>>>>>>>>>>>> Done
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending paragraph.
> I
> >>>> had
> >>>>>>>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in
> KIP-892.
> >>>>>>>>>>>>>>>>>>>>>> But it's
> >>>>>>>>>>>>>>>>>>>>> difficult to
> >>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892
> >>>> transaction
> >>>>>>>>>>>>>>>> buffers.
> >>>>>>>>>>>>>>>>>>>>> Instead,
> >>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc when
> >>>>>>>>>>>>>>>>>>>>>> KIP-892
> >>>>>>>>>>>>>>>> lands.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that
> was
> >>>>>>>>>>>>>>>>>> (significantly)
> >>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My
> >>> prototype
> >>>>>>>>>>>>>>>>>>>>>> currently
> >>>>>>>>>>>>>>>>>>>>> maintains
> >>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint,
> but
> >>>>>>>>>>>>>>>>>>>>>> doing so
> >>>>>>>>>>>>>>>>>> becomes
> >>>>>>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to
> >>> better
> >>>>>>>>>>>>>>>> encapsulate
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that
> can
> >>>>>>>>>>>>>>>>>>>>>> cheaply
> >>>>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing to
> >>>>>>>>>>>>>>>>>>>>>> duplicate
> >>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> cache.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to better
> >>>>>>>>>>>>>>>>>>>>>> encapsulate
> >>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>> My
> >>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but
> don't
> >>>>>>>>>>>>>>>> initialize*
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task
> directory
> >>>>>>>>>>>>>>>>>>>>>> on-disk?
> >>>>>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to
> >>>>>>>>>>>>>>>>>>>>>> query the
> >>>>>>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If the
> >>>>>>>>>>>>>>>> StateManager
> >>>>>>>>>>>>>>>>>>> (aka.
> >>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves too
> >>>>>>>>>>>>>>>>>>>>>> expensive
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> hold
> >>>>>>>>>>>>>>>>>>>>> open
> >>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a
> >>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in
> >>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>> place,
> >>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing
> >>> else?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> IDK, what do you think?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
> >>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline which
> >>>>>>>>>>>>>>>>>>>>>>> says to not
> >>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could
> >>> you
> >>>>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how
> >>>>> TaskManager#getTaskOffsetSums()
> >>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own but
> >>>>>>>>>>>>>>>>>>>>>>> that have a
> >>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling
> >>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread
> does
> >>>>>>>>>>>>>>>>>>>>>>> not own a
> >>>>>>>>>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the task,
> >>>>>>>>>>>>>>>>>>>>>>> which means
> >>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> no state store on which to call
> >>> getCommittedOffsets().
> >>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint file
> >>> is
> >>>>>>>>>>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the
> >>> RocksDBStore
> >>>>>>>>>>>>>>>>>>>>>>> -- and
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in
> >>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but are
> >>> not
> >>>>>>>>>>>>>>>>>>>>>>> currently
> >>>>>>>>>>>>>>>>>>>>> assigned
> >>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or
> since
> >>>>>>>>>>>>>>>>>> init(StateStore)
> >>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a
> >>> restart."
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the
> >>>>>>>>>>>>>>>>>>>>>>> restart, isn't
> >>>>>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka Streams
> >>>>>>>>>>>>>>>>>>>>>>> cannot
> >>>>>>>>>>>>>>>>>> guarantee
> >>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state store
> >>> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>> memtable
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records
> and
> >>>> the
> >>>>>>>>>>>>>>>>> committed
> >>>>>>>>>>>>>>>>>>>>>>> offsets are persisted.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy checkpointing
> >>>>>>>>>>>>>>>>>>>>>>> behavior is
> >>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from
> the
> >>>>>>>>>>>>>>>>>>>>>>> KIP, but
> >>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior will
> be
> >>>>>>>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 5.
> >>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the
> >>> tags,
> >>>>>>>>>>>>>>>>>>>>>>> and the
> >>>>>>>>>>>>>>>>>>> recording
> >>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or
> KIP-444.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out
> >>> the
> >>>>>>>>>>>>>>>>>>>>>>>> "Atomic
> >>>>>>>>>>>>>>>>>>>>>>> Checkpointing"
> >>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics for
> >>>>>>>>>>>>>>>>>>>>>>>> StateStores,
> >>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>> own
> >>>>>>>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes
> >>> outlined
> >>>>> in
> >>>>>>>>>>>>>>>>> KIP-1035,
> >>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part, and
> >>>>>>>>>>>>>>>>>>>>>>>> continued
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> spur
> >>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been
> >>>>>>>>>>>>>>>>>>>>>>>> removed from
> >>>>>>>>>>>>>>>>>>> KIP-892,
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to
> >>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in
> >>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>> place.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this set
> >>> of
> >>>>>>>>>>>>>>>>>>>>>>>> changes,
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Reply via email to